RabbitMQ


Author
|
Earl
Describe
|
该文档介绍RabbitMQ的特性和使用
Git
|
https://github.com/Earl-Li/rabbitmq-demo.git
Last Update
|
2024-01-14

 

RabbitMQ简介

MQ【message queue】,本质是一个队列,遵循 【FIFO】 先入先出原则,只不过队列中存放的内容是message 而已;是一种跨进程的通信机制,用于上下游【消息发送方和消息接收方】传递消息。

MQ 是互联网架构中一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务。

  1. RabbitMQ是流行的消息队列服务软件,是开源的AMQP(高级消息队列协议)实现。

    • 支持Java、Python、C、PHP、Ruby、JavaScript等多种客户端,

    • 用于在分布式系统中存储转发消息,可以实现异步处理、流量削峰、系统解耦,在易用性、扩展性、高可用等方面表现优异。

  2. 课程采用RabbitMQ 3.8.8版本,课程内容包括

    • RabbitMQ的环境搭建、

    • 消息的发送与接收、消息确认、

    • 延迟队列、死信队列、优先队列、惰性队列、

    • 与SpringBoot集成、

    • RabbitMQ集群

MQ消息队列

  1. MQ的引用场景

    • 流量消峰

      如果订单系统最多每秒能处理一万次订单,超过这个阈值系统可能崩溃,在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。

    • 应用解耦

      电商应用中有订单系统、库存系统、物流系统、支付系统。用户创建下单后,如果订单系统耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。

      当转变成基于消息队列的方式后,下订单任务完整会直接结束,并将订单消息传递给消息队列,由消息队列来调用并监督被调用系统的执行。系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障, 提升系统的可用性

    • 异步处理

      【服务之间不需要等待,等当前服务执行结束后会自动通知服务调用者获取数据继续下一步的操作】

      有些服务间调用是异步的,例如 A 调用 B, B 需要花费很长时间执行,但是 A 需要确认 B执行完成的时间以获取执行结果并继续执行后续操作,以前一般有两种方式,都不是很优雅

      • A 过一段时间去调用 B 的查询 api 查询。

      • 或者 A 提供一个 callback api,B 执行完之后调用 api 通知 A 服务。

      使用消息总线可以方便地解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ, MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些操作。 A 服务还能及时的得到异步处理成功的消息。

       

  2. MQ的分类

    • ActiveMQ

      很老的MQ,apache开发的

      • 优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较高,丢失数据的概率很低

      • 缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少, 高吞吐量场景较少使用。

    • Kafka

      Kafka主要特点是基于 Pull 的模式来处理消息,追求高吞吐量,是大数据领域内的消息传输杀手锏,专为大数据而生的消息中间件,以其百万级 TPS 的吞吐量名声大噪,大数据领域的宠儿,在数据采集、传输、存储的过程中有着举足轻重的作用。已经被 LinkedIn,Uber,Twitter,Netflix 等采纳。

      • 优点:

        • 卓越的优点就是吞吐量高,单机写入 TPS 约在百万条/秒。时效性 ms 级可用性非常高,

        • kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;

        • 有优秀的第三方Kafka Web 管理界面 Kafka-Manager;

        • 日志领域成熟;功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

      • 缺点:

        • Kafka 单机超过 64 个队列/分区, Load 会发生明显的飙高现象,队列越多, load 越高,发送消息响应时间变长, 使用短轮询方式,实时性取决于轮询间隔时间, 消费失败不支持重试;

        • 支持消息顺序,但是一台代理宕机后,就会产生消息乱序,

        • 社区更新较慢;

    • RocketMQ

      RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理, binglog 分发等场景。

      • 优点:

        • 单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,

        • 源码是 java 我们可以自己阅读源码,定制自己公司的 MQ

      • 缺点:

        • 支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;

        • 社区活跃度一般,没有在 MQ核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码

    • RabbitMQ

      2007 年发布,是一个在 AMQP【高级消息队列协议】基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一

      https://www.rabbitmq.com/news.html

      • 优点:

        • 由于 erlang 语言的高并发特性,性能较好; 吞吐量到万级, MQ 功能比较完备,健壮、稳定、易用、跨平台、 支持多种语言 如: Python、 Ruby、 .NET、 Java、 JMS、 C、 PHP、 ActionScript、 XMPP、 STOMP等,支持 AJAX 文档齐全;

        • 开源提供的管理界面非常棒,用起来很好用,

        • 社区活跃度高; 更新频率相当高

      • 缺点:商业版需要收费,学习成本较高

  3. MQ的选择

    • Kafka

      用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。

      大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了。

    • RocketMQ

      为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商订单扣款,业务削峰。

      RoketMQ 在稳定性上更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议选择 RocketMQ。

    • RabbitMQ

      结合 erlang 语言本身的并发优势,性能好时效性微秒级, 社区活跃度也比较高,管理界面用起来十分方便,

      如果你的数据量没有那么大, 中小型公司优先选择功能比较完备的 RabbitMQ。

 

RabbitMQ介绍

RabbitMQ 是一个消息中间件:它接受并转发消息。类比于快递站,消息类比为包裹,RabbitMQ就是快递站,快递站接收,存储和转发消息数据,将数据送到用户手里

  1. 四大核心概念

    • 生产者

      生产者是产生数据发送消息给消息中间件的程序【服务】

    • 交换机

      交换机是 RabbitMQ内部的一个重要部件,一方面接收来自生产者的消息,另一方面将消息推送到队列中。

      一个消息中间件可以有多个交换机,每个交换机可以绑定多个队列

      交换机必须明确接收到的消息的处理逻辑,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个由交换机的类型决定

    • 队列

      队列是 RabbitMQ 内部使用的一种数据结构, 尽管消息流经在 RabbitMQ 和应用程序之间,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是该队列的使用方式

      一个消息中间件中可以有多个消息队列,一个交换机与多个消息队列绑定,每个队列对应一个消费者,多个消费者虽然可以对应同一个队列,但是队列中的消息只会发送给这些消费者中的其中一个

    • 消费者

      消费者是一个等待接收消息的程序。 注意生产者,消费者和消息中间件很多时候并不在同一机器上。

      同一个应用程序既可以是生产者又是可以是消费者。

  2. RabbitMQ的核心部分

    RabbitMQ的六大模式

    • 简单模式【Hello World!】

    • 工作模式【Work queues】

    • 发布订阅模式【Publish/Subscribe】

    • 路由模式【Routing】

    • 主题模式【Topics】

    • 发布确认模式【Publisher Confirm】

     

  3. RabbitMQ的工作原理

    【原理图】

    • 黄色部分Broker是RabbitMQ的一个实体,Broker意为中间人、经纪人,表示接受和分发消息的应用,可以是RabbitMQ的服务器,也被称为Message Broker【Exchange是一个消息中间件中的多个交换机,Queue是队列】

    • Virtual host:出于多租户和安全因素设计的,把 AMQP【高级消息队列协议】 的基本组件划分到一个虚拟的分组中,类似 于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建多个 exchange/ queue 等

      • 多租户:每个Broker中可以包含多个Virtual host,每个Virtual host中可以包含多个交换机和队列

    • Connection: 生产者或消费者与消息中间间之间的 TCP 连接

    • Channel表示信道,每个生产者会与MQ建立连接,建立一个TCP连接的开销非常大,效率低;Channel 是在 connection 内部建立的逻辑连接,TCP连接中可以创建多个Channel,如果应用程序支持多线程,通常每个线程会创建单独的信道进行通讯, AMQP method 包含了 channel id 帮助客户端和消息中间件识别信道,所以信道之间是完全隔离的。channel的设计也是为了减少操作系统建立TCP连接的开支,消费这通过信道直接连接交换机,交换机再连接队列

    • Exchange: 消息到达消息队列的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到队列中去。常用的交换机类型有: direct (point-to-point), topic (publish-subscribe) and fanout(multicast)

    • Queue: 消息最终被送到这里等待 consumer 取走

    • Producer是生产者

    • Consumer表示消费者,即消息的接收方

    • Binding:就是交换机与队列间的连线

     

RabbitMQ安装

官网:https://www.rabbitmq.com/download.html

RabbitMQ的运行需要Erlang语言的运行环境,RabbitMQ用的最多的是linux系统的,RabbitMQ的版本需要对应linux系统的版本,使用命令uname -a查看当前linux系统的版本。el7表示linux7

安装步骤

  1. 将以下文件上传至/opt/rabbitmq目录下

  2. 将以下两个文件移动到/usr/local/rabbitmq目录下

  3. 使用以下命令安装对应软件

    • 使用命令rpm -ivh erlang-21.3-1.el7.x86_64.rpm安装erlang环境【i表示安装,v表示显示安装进度】

    • 使用命令yum install socat -y【安装rabbitmq需要安装rabbitmq的依赖包socat】

      yum命令需要去互联网联网下载安装包

    • 使用命令rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm安装rabbitmq

 

安装成功测试

  1. 使用命令chkconfig rabbitmq-server on设置rab bitmq服务开机启动

  2. 使用命令/sbin/service rabbitmq-server start手动启动rabbitmq服务

  3. 使用命令/sbin/service rabbitmq-server status查看rabbitmq服务状态【如果服务是启动状态active会显示running,正在启动会显示activing,inactive表示服务已经关闭】

  4. 使用命令/sbin/service rabbitmq-server stop停止rabbitmq服务

  5. 在rabbitmq服务关闭的状态下使用命令rabbitmq-plugins enable rabbitmq_management安装rabbitmq的web管理插件【执行了该命令才能通过浏览器输入地址http://主机地址:rabbitmq端口号15672访问rabbitmq管理界面,访问rabbitmq需要开启防火墙端口通讯,RabbitMQ本身的端口是5672,15672是管理界面的端口】

    初始账号和密码默认都是guest,第一次登录会显示没有用户只能通过本地登录,此时需要添加一个账户进行远程登录

    【开放rabbitmq防火墙端口通讯】

    【web控制台】

  6. 使用命令systemctl status firewalld查看防火墙状态

  7. 使用命令systemctl stop firewalld关闭防火墙

  8. 使用命令systemctl disable firewalld.service可以设置防火墙下次开机也不会自动启动

  9. 添加用户并设置超级管理员权限以登录web控制台

    • 使用命令rabbitmqctl add_user earl 123456创建账户,账户名earl,密码123456

    • 使用命令rabbitmqctl set_user_tags earl administrator设置用户earl的角色为超级管理员

    • 使用命令rabbitmqctl set_permissions -p "/" earl ".*" ".*" ".*"设置用户权限

      [-p <vhostpath>] <user> <conf> <write> <read>;-p <vhostpath>表示设置vhost的路径,conf表示可以配置哪些资源,user表示用户,write表示写权限、read表示读权限

      上个命令的意思表示对于用户earl设置具有对/vhost1这个virtual host中的所有资源的配置、写、读权限;每个vhost代表一个库,不同vhost中的交换机和队列是不同的

      guest访问不了就是因为没有设置"/"vhost的路径

    • 使用命令rabbitmqctl list_users查看当前rabbitmq server有哪些用户

     

    【MQ的后台管理界面】

    admin路由中就可以增删改查用户

 

 

简单模式

以下演示的就是简单队列模式

【结构图】

在RabbitMQ的安装中已经将MQ【消息缓冲区】安装好了,现在使用Java API实现生产者发送单个消息给消息队列,消息队列获取消息转发给消费者并打印消息,实现消息的通信

创建项目rabbitmq-demo,创建模块01

使用云服务器的一定要把 5627这个端口号打开,5672端口的作用是用于tcp连接;15672的作用是用于http连接。 同时在建立连接时默认端口号是5627 所以在创建连接时不用指定【服务器必须开启5672端口,本地主机linux系统也需要开启5672端口才能访问,否则会连接超时】

commons-io是apache基金会下的

  1. 01模块搭建

    • pom.xml

    • Producer

  2. 测试效果

    必须开放linux的5627端口和15672端口,web控制台访问只需要开启15672端口

    【生成的队列】

    【消息情况】

    一条消息处于就绪状态准备被消费,总消息为1条

  3. 消费者代码

    可以写在同一个包下【同一个服务中】,发送和接收消息都是通过主函数执行的

    • Consumer

      【消费消息】

 

 

 

工作队列

Work Queues【任务队列】,消息被多个工作线程接收,工作线程采用轮询的策略抢夺消息,一个消息只会被处理一次

就是生产者发送了大量消息,此时可能存在多个消费者一起来处理这些消息,这些消费者称为工作线程,这些工作线程采用轮询的策略获取竞争这些消息并同时对消息进行处理

  1. 工作队列结构图

工作线程就是消费者,改了一个名字,多个工作线程

竞争关系是说其中一个工作线程抢到了某个消息,其他线程将无法抢夺该消息

 

  1. 工作队列的代码实现

    生产者大量发送消息,两个工作线程去接收消息,观察两个工作线程的轮询接受消息

    注意,消费者一定不能用junit的测试接口写,否则没有监听的效果

    为了代码复用,把信道创建的代码封装成一个工具类

    两个类的代码相同或者代码基本相同,可以选择EditConfigurations选择Allow parallel run【idea老版本】或者modify option中找到Allow multiple instance【idea新版本】,勾选表示允许一个类启动在不同的进程【?确认是进程还是线程】

    显示的效果是生产者发送带序号的消息,会轮询的被两个工作线程接收

    • 工具类

      封装获取信道的工具类,本例中的每个工作线程即便使用静态代码块都会使用一个全新的连接,这个怎么弄成一个呢

    • 消费者

      注意消费者的channel对象不能写在try后面的括号中,否则无法获取消息队列中的消息,可以写在try语句块的大括号中;生产者的Channel对象可以写在try的小括号中

      回调函数必须定义在方法的大括号中

      【开启如下配置就可以简单修改参数将该类以另一个类的形式启动】

      进行该项配置后就可以根据WorkThread1修改WT1/WT2分别启动实现分别启动两个主函数的效果

      效果在测试效果中有展示

    • 生产者

      将控制台输入的消息传递给消息队列

    • 测试效果

      工作队列采用轮询的策略处理消息

      【web控制台】

      【消息发送】

      【消息接收】

      【消息接收工作线程2】

 

消息应答

消费者完成一个任务可能需要一段时间,在此期间消费者突然挂掉了,如果RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除,我们将丢失正在处理的消息以及后续发送给该消费者的消息。

为了保证消息在发送过程中不丢失, rabbitmq 引入消息应答机制,消息应答就是消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经对特定消息处理完成, rabbitmq 可以把该消息删除了。

只要工作线程不进行消息应答,队列中的消息是不会删除的

  1. 自动应答

    这种模式仅适用于消费者可以高效并以一定速率处理这些消息的情况下使用

    • 消息发送后立即被认为已经传送成功【消费者接收到消息就马上进行应答,我怎么感觉讲错了,是消息从消息中间件发送就认为传送成功了,因为后面说连接或者信道关闭,消息就丢失了】,

    • 这种模式在高吞吐量和数据传输安全性方面不是很好,因为该模式下如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,消息就会丢失了

    • 另一方面这种模式下消费者没有对传递的消息数量进行限制,没处理完上一个消息下一个消息就发送过来,可能使得消费者由于接收太多还来不及处理的消息导致这些消息积压,最终使内存耗尽,导致这些消费者线程被操作系统杀死

  2. 手动应答

    自动应答在数据安全和系统安全方面问题比较大,一般都推荐使用手动应答,以下列举手动应答的相关方法

    手动应答的好处是可以批量应答并减少网络拥堵

    • Channel.basicAck()

      用于肯定确认,执行该方法RabbitMQ会认为消息被成功处理,可以将该消息丢弃

    • Channel.basicNack(deliveryTag,true)

      否定确认,执行这个方法RabbitMQ会认为当前该消息不能进行丢弃

    • Channel.basicReject()

      否定确认,和上面方法的区别是缺少一个批量处理的参数Multiple,执行这个方法RabbitMQ会认为该消息处理失败且不再进行处理,可以将该数据进行丢弃

      这个讲的不清楚,后面自己研究

  3. Mutiple批量处理参数的解释

    • Channel.basicNack(deliveryTag,true)的第二个参数就是是否采用批量应答的参数

      • 如果是true,表示批量应答Channel上未应答的消息

        信道上的数据并不是一条一条传递的,信道中的数据可能存在好几个独立的消息,头部即当前tag消息才是工作线程下一个要获取的消息,如果批量应答Multiple参数为true,当当前tag对应的消息处理完成后会将信道中所有的消息都做手动确认应答,这种方式如果在处理信道剩余数据过程中消费者宕机,会直接导致信道中剩余的数据丢失【不太确认究竟是处理完再批量应答还是接收到tag对应消息就批量应答,课件就只说tag为8就应答,离谱,课堂上说的是tag对应的已经处理完的消息,就认为是tag对应的消息处理完再批量应答】

      • 如果为false,表示不批量应答信道上未处理的信息,只有当前tag对应的消息处理完后被应答给RabbitMQ

        批量应答存在风险,不建议使用批量应答,即第二个参数设置为false;批量应答虽然速度快,减少网络压力,但是存在消息丢失的可能

 

  1. 消息自动重新入队

    • 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失), 导致消息中间件无法接收到消费者处理完消息发送的 ACK 确认, RabbitMQ 将发送给该消费者的消息重新排队。如果其他消费者可以处理,它将被重新分发给另一个消费者。

    • 通过这种机制确保消息不会丢失,但是存在消息被重复消费的情况【后面的幂等性会解决重复消费的问题,尚硅谷就这样,逻辑性不连贯,没有老杜讲的好理解,幂等性还是弹幕说的】

 

  1. 消息手动应答代码实现

    工作线程消息处理执行完毕执行手动应答

    此前案例basicConsume方法第二个参数autoAck都设置的true表示自动应答,手动应答需要将其设置为false,并在deliverCallback方法中对消息处理完之后使用方法channel.basicAck()进行手动应答,该方法的第二个参数是是否批量应答,选择false不使用批量应答,处理一个应答一个

    在third包下实现,一个生产者,写两个消费者【为什么这里代码不能改了复用?】,一个消费者接收消息后睡1s,一个消费者接收消息后睡30s,模拟一个工作线程执行时间很长的情况,期间不出问题再手动应答,期间关闭程序不应答断连接检验消息是否丢失【验证消息在手动应答时是不丢失的,会自动放回队列中重新消费】

    结论:

    1. 在某个工作线程处理消息时间很长的情况下,所有的工作线程仍然遵循轮询消息分发的策略

    2. 当某个工作线程接收了一连串消息还没处理完,中途和消息中间件的连接断掉,消息中间件在连接断掉之后会立即将该工作线程还未处理的剩余消息全部重新入队列,再发送给其他建立连接的工作线程

    • 工具类

      【线程睡眠】

      【信道工具类】

    • 消息生产者

      演示在third包下

    • 工作线程1

    • 工作线程2

    • 测试效果

      • 连续发送消息

        【消息生产】

        【工作线程】

        会发现,处理的很慢的一方仍然是轮询的规则,这不会很低效吗,数据量比较大的情况下仍然如此

        【消息队列情况】

      • 处理时间较长没有消息应答处理过程连接直接断掉的情况

        【消息生产】

        【工作线程】

        在处理消息18的时候直接断掉程序

        【处理时间较短的工作线程】

        当工作线程2挂掉以后,已经发送到2的所有消息全部重新入队发送给了1【即从18开始都是工作线程1处理工作线程2还未处理的消息】,不知道发送给2还未处理的消息都存在信道还是存在哪儿

        【消息队列情况】

        断崖掉是关闭了工作线程2导致的

         

         

消息和队列持久化

默认情况下 RabbitMQ 宕机队列和消息就会消失,确保消息不会丢失需要将队列和消息都标记为持久化。

持久化的队列在RabbitMQWeb控制台的queue菜单的Features字段会显示大写D

  1. 实现消息和队列持久化

    • 队列持久化,必须将原来非持久化的队列删除后再次在生产者声明新建同名的持久化队列,原先队列没删除会报错当前队列非持久化

      删除可以在queues中点击对应的队列,在弹出页面点击delete--delete Queue删除原队列

      队列持久化在重启RabbitMQ后队列依然存在【?队列中的消息是否存在】,感觉像只是设置了队列持久化,并没有设置消息持久化,持久化队列中的消息没有设置持久化仍然会丢失

      【持久化队列】

    • 消息持久化

      消息实现持久化需要在消息生产者发送消息时在basicPublish方法的其他参数添加属性MessageProperties.PERSISTENT_TEXT_PLAIN

      尽管这种方式使RabbitMQ 将消息保存到磁盘,但是可能存在消息刚准备存储在磁盘但还没有存储完RabbitMQ就宕机的情况,仍然可能丢数据,但对简单任务队列而言已经够用了。后边会介绍更强有力的"发布确认"持久化策略。

 

不公平分发

RabbitMQ默认是轮询分发,在某种场景下这种策略并不好,如有个工作线程1处理任务的速度非常快,而另一个工作线程2处理速度很慢,此时采用轮询分发策略处理速度快工作线程大部分时间处于空闲状态,处理速度慢的工作线程一直在干活,这种情况下轮询策略效率低下。为了避免这种情况,我们可以通过设置参数 channel.basicQos(1); 开启RabbitMQ的不公平分发,使处理速度快的工作线程分配更多的消息,实际默认设置channel.basicQos(0),就是轮询分发

实际工作场景一般都使用不公平分发,在Channel信道列表能看到信道的Prefetch_count的分发类型

  1. 在消费者接收消息之前设置分发方式为不公平分发

    实质是设置信道容量的大小,采用轮询的方式往信道放消息,信道满了就跳过!!!!

    注意应答方式也要改成手动应答,否则设置的不公平分发不会生效【因为处理完一条数据会应答消息队列,消息队列再回尝试发送数据测试一下一次最多会发送几条】【经过测试是一条,那岂不是处理完一条再发下一条

    • 不设置basicQos的话是一次性平均分发给所有的队列。设置之后限制了一次分发消息的数量,再设置手动确认机制,这样当你还没提交已经处理好的时候他是不会给你消息的,这样才能实现不公平分发。

    • 同一个消息队列相关的每个信道都要设置

  2. 测试效果

    unacked是尚未确认的意思

    【生产者】

    【处理速度快的消费者】

    【处理慢的消费者】

 

预期值

实质就是将信道作为一个未确认消息的消息缓存区,通过限制消息缓冲区的大小【预期值,可以视作滑动窗口的大小】避免缓冲区未确认消息无限制堆积的问题

  1. 预取值【perfetchCount】:信道可以一次性获取队列中c条信息

    【预期值包含了未处理的和当前正在处理的,视为当前正在处理的在信道的头部】

    • 当为0时不限制,所以队列中的消息可以轮询着一次性发完,

    • 当为1时,只能获取一条,处理完获取下一条

  2. 设置预期值的效果

    • 预期值就是信道容纳预期值数量的消息

    • 信道满之前还是按照轮询的规则给每个信道分配直到某个信道堆积到预期值数量的消息,此后接收到应答确认再发消息

      【确认一下是信道中的数据处理完了再重发两条还是处理完一条立马将信道补满,经过确认一应答就补,实际预期值就是信道的最大消息堆积数量】

  3. 同一个队列的不同信道预取值可以设置成不同的数量

 

发布确认

【Publish/Subscribe】

发布确认的核心是RabbitMQ将消息保存在磁盘上以后向生产者发布确认信息,生产者确实收到消息队列发过来的确认消息已经持久化到硬盘上的信息【这里面暗含了三个前提条件:队列必须设置持久化、队列中的消息必须设置持久化、确认设置了发布确认模式】

  • 没有设置队列持久化和队列中消息持久化也是可以设置发布确认模式的,此时消息投递到队列就会向生产者传递确认消息

生产者将信道设置成发布确认模式后,所有在该信道上面发布的消息都将会被指派一个从1开始的唯一ID

  • 没有设置消息和队列是持久化的情况下,当消息被投递到匹配的队列之后,消息队列会发送一个确认给生产者【确认信息中包含了消息的唯一ID】,使得生产者知道消息已经正确到达目的队列

  • 如果消息和队列是可持久化的情况下,确认消息会在消息写入磁盘之后发出,消息队列回传给生产者的确认消息的delivery-tag域中包含了对应消息消息的ID

    • 此外消息队列也可以设置basic.ack的multiple域【批量应答】,表示到当前消息之前的所有消息都已经得到了处理

  • 确认发布模式最大的好处在于他是异步的,生产者可以在等待信道返回确认的同时继续发送下一条消息

    • 当消息最终得到确认之后,生产者可以通过回调方法来处理该确认消息

    • 如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者同样可以在回调方法中处理该nack消息

开启发布确认模式

  1. 开启发布确认模式

    发布确认模式是在发消息前对信道使用confirmSelect方法开启的

 

 

三种发布确认模式

经过测试,三种发布确认模式发送1000条相同消息的总时间分别为460、58、25毫秒

核心是消息中间件确认需要时间,单个发布确认每次都等确认完成再发送下一个;批量发布确认等对方一批确认完成再执行发送下一批;异步发布确认是发送过程不管确认的问题,使用监听线程监听消息确认回调,统一处理后告知发送失败的消息

企业用的都是异步处理,最好用,速度最快

  1. 三种模式的特点

    • 单独发布确认【460ms】

      同步等待确认【每发一条确认一条,不缺认下一条发送不了】, 简单,但吞吐量非常有限

    • 批量发布确认【58ms】

      批量同步等待确认【一批消息一次确认】,简单,合理的吞吐量, 一旦出现问题但很难推断出是那条消息出现了问题

    • 异步发布确认【25ms】

      【发送的时候不管确认】,最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些【多个监听线程和并发跳跃哈希表】

单个发布确认

是一种同步发布确认的方式【即发布一个消息后必须等到该消息被确认发布后,下一条消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回布尔值,确认成功返回true,如果在指定时间范围内这个消息没有被确认那么它将抛出异常

  1. 缺点

    • 发布速度特别的慢

      没有确认发布的消息会阻塞所有后续消息的发布,只有等待当前消息发布确认后才发送下一条,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这已经足够了

  2. 代码实现

    在forth包下进行演示

    打印1000条消息发布总共耗时的时间验证不同模式间的性能差异

    1000条耗时460ms

     

批量发布确认

单个发布确认非常慢,批量发布确认先发布一批消息然后一起确认可以极大地提高吞吐量

waitForConfirms方法的逻辑有点类似执行这个方法去获取消息的确认状态,在消息发布确认的过程中,如果有一个消息出现问题【?后面的消息都会出问题吗,不然的话一个状态的确认怎么能代表所有,除非有一个出问题,都会导致状态置为false,表示这一批出现了问题】,waitForConfirms的结果就不为true,当执行完这个方法状态会被重新置为true,检验下一批的状态,这意为着waitForConfirms方法可以根据设置的位置不同而自主选择消息确认批次中消息数量的多少【如在所有消息发送完成后,是将整个消息作为整体进行发布确认,出了问题只知道本次发送出了问题,也可以设置当发送多少次消息后进行一次发布确认,出了问题可以知道出问题的批次】

 

  1. 缺点

    • 当发生故障导致发布出现问题时,不知道具体是哪个消息出现了问题, 必须将整个批处理消息保存在内存中,记录重要的信息后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布这句话什么意思

  2. 代码实现

    1000条耗时58ms,相比与单个发布确认,速度快了8倍

 

异步发布确认

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都特别高,他是利用消息队列的回调函数来达到消息可靠性传递的

  1. 异步发布确认图解

    消息发在信道中,每个消息都以Map集合的方式,在key中保存消息的序号,消息生产者不需要再关注何时去获取发布确认,会由交换机根据消息序号找到哪些消息发送成功,哪些消息没有收到统一地异步返回给生产者,没收到的生产者再重新发送即可

    异步发布确认的代码实现比较繁琐

  2. 代码实现

    在发送消息之前准备一个消息监听器,监听消息中间件通过信道返回给生产者消息发送成功与否的具体情况

    监听器的重载方法有单参,有双参;单参是只监听成功的,双参是既监听成功的也监听失败的;监听成功和监听失败的接口都是同一个函数式接口的不同实现

    收到确认条数少于发送条数的同学:rabbitmq给的成功回调可能是单条的也可能是批量的,批量时multiple=true,表示该tag及其之前的消息都确认了

    注意主线程主方法执行结束,其他线程会自动结束不再进行打印,所以这里让主线程睡10s,让消息队列回调打印完成,【确实打印到1000截止】

    回调函数中的第一个参数sequenceNumber是消息的序号,从1开始到1000;第二个参数表示当前返回的序号是批量确认还是单个确认,

    异步发布确认再消息发送完成以前就开始批量确认回调通知生产者之前发送的部分消息已经接收到了

     

  3. 处理异步未确认消息的方式

    思路是将未确认消息重新发送或者将未确认消息保存起来以后再重新发送

    解决方案是监听线程把未确认的消息放到一个基于内存的能被发布线程访问的队列,如 ConcurrentLinkedQueue 【并发链式队列,JUC有讲】,这个并发链式队列在监听线程【暂时认为确认回调和未确认回调在一个线程中】与发布线程之间进行消息的传递

    • 在消息发布的时候生产者就要将所有消息记录在并发链式队列【后改用并发跳越哈希表,因为消息队列只返回序号,需要序号把消息对应起来】中

      跳表是有序链表,发布确认模式下消息的编号是从1开始的

      选择并发跳跃哈希表的原因是序号和消息对应,轻松添加和删除

      支持高并发,可以多线程访问,存操作和删操作可能同时进行,但是同时操作的对象不可能是同一个

    • 在确认回调删除已经被确认的消息,剩下的就是未确认的消息

      核心是生产者发消息的同时向并发跳跃哈希表添加消息和消息序号的key-value键值对

      在确认回调用headMap获取当前确认消息序号到首个元素的子跳表,用子跳表的clear方法删除跳表中的对应子跳表,并发跳跃哈希表中剩余的就是未被确认的【?疑惑,如果之前有确认失败的,后续headMap不会一起给删了吗?是否需要单独在失败回调中将确认失败的消息单独取出存起来?学了JUC来看

       

    • 批次确认的部分消息

      序号从1开始,一直到1000,并发哈希跳表最后剩下的是未被确认的数据,

      疑问:这里主线程噶了,其他线程也会噶,即打印会中断,主线程噶了不是守护线程噶吗

      【守护线程噶了导致监听线程噶】

 

交换机

控制台中的Exchanges的AMQP default是默认交换机,发送消息指定交换机为空串就会走默认交换机

通常生产者生产的消息不会直接送到队列,生产者都不知道这些消息传递到了哪些队列,生产者只负则将消息传递给交换机

  1. 交换机介绍

    • 默认情况下,一个消息只能被一个工作线程消费一次;【生产者生产的消息从不会直接发送给队列,这种情况下不需要使用交换机课程说的是错的,这种情况下还是会使用交换机,使用的是默认交换机AMQP default

    • 可能存在一种工作场景,一个消息需要被多个工作线程消费【这种情况由交换机绑定到多个队列,消息同时发送到多个队列,每个队列中的消息只能被消费一次,从而实现同一个消息被消费多次】

      这种模式被称为发布订阅模式

  2. 交换机的概念

    消息能路由发送到队列是由routingKey【bindingKey】指定的队列,此前指定默认交换机时通过第二个参数指定队列名就是指定的bingdingKey

    • 交换机的工作内容

      • 接收来自生产者的消息,

      • 将消息推入队列

        交换机必须确切知道如何处理收到的消息

        由交换机的类型决定应该把消息放到特定队列或把消息放到许多队列中又或者丢弃这些消息

    • 交换机的类型

      • 直接类型【direct】

        直接类型也叫路由类型

      • 主题类型【topic】

      • 标题类型【headers】

        头类型,在企业中已经不常使用了

      • 扇出类型【fanout】

        扇出类型就是发布订阅模式

      • 无名类型

        无名类型就是默认的交换机类型,通过空字符串进行标识

 

临时队列

临时队列是未设置持久化的对列,一旦RabbitMQ打开消费者连接,会被自动删除的队列?【队列不是由生产者声明创建的吗】

这里估计讲错了,是RabbitMQ一旦重启,该队列就会被删除,经过测试,即便队列为空,生产者和消费者都断开连接临时队列依然存在

  1. 通过信道指定队列名创建临时队列

    在发送消息前声明队列的名称等参数

  2. 通过信道队列声明的getQueue方法创建随机队列名的队列,并返回队列名称

    【随机临时队列效果】

    AD、Excl就是表示临时的意思

 

 

绑定

binding指定了交换机和队列之间的对应关系,RountingKey是用户自定义的关键词,认为RountingKey是绑定关系的标识,交换机通过RountingKey将消息路由到对应绑定的队列【一个交换机可以绑定多个队列,生产者可以通过RountingKey指定交换机把消息发送给指定的队列而非所有与交换机绑定的队列】

通过rountingKey可以实现由生产者随意决定消息的发送方式

  1. 绑定实操演示

    【定义交换机】

    【定义队列】

    【交换机绑定队列】

     

 

发布订阅模式

扇出类型,翻译成扇出【Fanout】,其实就是发布订阅模式

将接收到的所有消息广播到对应扇出类型交换机绑定的所有队列中

系统自带一个发布订阅交换机,名字叫做amq.fanout,除此以外还可以自定义一个发布订阅的交换机而不使用系统自带的

卧槽,大家都说RoutingKey和扇出模式无关,只要交换机是扇出模式,那么其绑定的队列都会收到消息,经过验证,确实如此,即使RountingKey和生产者设定不同,仍然能接收到消息

发布订阅模式在SpringBoot中的绑定没有设置RoutingKey的方法,因为不需要绑定,原生代码绑定了也没有效果

【RountingKey】

【测试效果】

  • 这里将队列2的RountingKey改成了123,注意123是字符串的形式

  1. Fanout实现结构梳理

    实现在fifth包下,构建一个简单的日志系统。生产者将发出日志消息,启动两个消费者,一个消费者接收到消息后把日志存储在磁盘, 另外一个消费者接收到消息后把消息打印在屏幕上,以验证一个生产者发出的消息被广播给fanout类型交换机绑定的所有消费者

    【项目结构】

    交换机名为logs,绑定两个随机队列,RountingKey两个都设置为空串【即什么都不写】,实现生产者发送的消息同时被消费者接收到并打印

  2. 代码实现

    要点:

    • 生产者、消费者都可以对交换机和队列进行声明,且只需要声明一次,在声明一次的情况下,声明的程序必须首先启动,否则即使创建了交换机生产者发送第二条消息的时候也会报错

    • 生产者发送信息使用了交换机可以不指定队列,此时只有4个参数,第二个参数是routingKey,交换机会自动根据绑定的队列和routingKey将消息发送到指定队列中

    • 队列的声明最好放在消费者一侧,因为生产者在有交换机和routingKey的情况下,不用关心具体将消息发送给哪一个队列,只需要发送给交换机,交换机根据信息自动裁定;但是消费者需要和队列进行绑定,必须知道队列的名称,如果使用随机临时队列,在消费者一侧声明,basicConsume方法接收消息的队列名参数会很方便,同时绑定交换机和队列也很方便

    • 生产者

    • 消费者1打印日志

    • 消费者2生成覆盖日志文件

    • 执行效果演示

      【生产者】

      【消费者1】

      【消费者2】

      【生产者没有声明交换机且启动顺序错误报错】

     

     

路由模式

路由模式也称直接交换机,直接模式根据RoutingKey和交换机精确匹配队列;扇出模式忽略RoutingKey,向所有与交换机绑定的队列发送消息【已经验证】

同样是构建日志系统,希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。扇出模式不会对队列进行区分,在这种场景下可以使用直接模式让消息只去指定RountingKey对应的队列中去

  1. 要点

    • 一个队列可以和一个交换机存在多个绑定关系,每个绑定关系对应1个RoutingKey

      像图上这种情况使用任意一个RoutingKey消息都能路由到console队列,组合起来作为新的RoutingKey消息会被丢弃

      【多个RoutingKey结构图】

    • 多重绑定,多个队列相同的RoutingKey

      这种情况下,直接模式的表现效果类似扇出模式,会将消息向指定RoutingKey的所有队列传递

      【多重绑定结构图】

      生产者指定RoutingKey为black,消息会同时传递给队列Q1和Q2

  2. 代码实现

    sixth包下,交换机名为direct_logs,两个队列console和disk,

    • 生产者

    • 消费者1

    • 消费者2

    • 测试效果

      结论:RoutingKey为error、warning和info的都分发到对应的队列中去了,其他RoutingKey和组合RoutingKey对应的消息丢弃

      【生产者发送消息】

      【消费者1接收到消息】

      【消费者2接收到消息】

      【交换机绑定情况】

 

主题模式

【topic】直接交换机不可能同时路由两个RoutingKey不同的队列,如果某天存在这样的需求,只能使用Topic模式

  1. 特点

    • topic交换机的 routing_key 必须是一个单词列表,

      • 单词间以点号分隔开,注意经过只有一个单词也可以正常使用

      • 单词列表的长度不能超过 255 个字节

      • 一个队列可以被多个RoutingKey单词列表路由,一个队列的多个RoutingKey都匹配,消息也只会被该队列接收一次

      • 不匹配任何RoutingKey单词列表的消息会被丢弃

      • *(星号)可以代替一个单词

      • #(井号)可以替代零个或多个单词【一个队列RoutingKey是#,那么这个队列将匹配所有的RoutingKey接收所有数据 】

      如"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit"

      *.orange.* 中间为 orange 长度为3个单词的字符串

      lazy.# 第一个单词是lazy的单词列表

    • 示例

      消息的RoutingKey为quick.orange.rabbit的能同时匹配Q1和Q2队列

      消息的RoutingKey为lazy.pink.rabbit的同时满足Q2的两个RoutingKey,但消息只会被接收一次

  2. 代码实现

    交换机为topic_logs,类型为主题交换机;对列设为Q1、Q2,RoutingKey设置如上图所示

    发送消息,验证消息RoutingKey设置为不同列表队列的接收情况

    • 生产者

      注意必须消费者完全启动,生产者发送消息才会生效

    • 消费者1

    • 消费者2

    • 测试效果

      【生产者消息发送】

      一共发送10条,Q1接收者会显示所有Q1能收到的,有4条;Q2接收者会收到所有Q2能收到的,有5条;有3条丢弃

      被两个都收到的有2条【4+5-2+3=10】

      【消费者1接收消息】

      【消费者2接收消息】

      【交换机绑定情况】

 

死信队列

死信:无法被消费的消息,某些时候可能由于特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有即时处理,就变成了死信,?死信队列是有死信的队列还是全是死信的队列【感觉像将无法消费的信息专门放在一个队列方便有条件了再处理,这样的队列称为死信队列】。

绑定死信交换机的队列的Features字段会显示DLX和DLK,分别表示死信交换机和绑定死信交换机和死信队列的RoutingKey

应用场景:

  • 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中,等待环境好转之后再将死信队列中的消息进行消费,防止消息丢失

  • 死信队列可以做一些延迟消息的处理,死信可以在指定的时间内被消费者消费

    • 如用户在商城下单成功并点击去支付后在指定时间未支付时消息的自动失效

 

  1. 死信的来源

    • 消息TTL【Time to Live】消息存活时间过期

      过期的消息贝能再被消费

    • 队列达到最大长度

      队列满了,无法再添加数据到消息队列中

    • 消息被拒,消息在应答的时候进行了拒绝应答【basic.reject】或者否定应答【basic.nack】且requeue=false设置了消息不放回队列中

      让这种消息不要重新放在队列中进行消费,将其放在死信队列中等后期有条件了再进行后续处理

  2. 死信场景搭建

    • 场景架构

      一个生产者走直接交换机,正常情况通过RoutingKey=zhangsan被消费者C1消费,消息遇上三种情况之一成为死信,死信会被马上转发到死信交换机【是一个直接交换机dead_exchange】,通过自定义RoutingKey=lisi转发到死信队列被C2消费

      消费者包括C1正常队列消费者和C2死信队列消费者

      队列包含正常队列和死信队列

      交换机包含一个正常交换机和一个死信交换机【都是直接类型,一个和正常队列绑定,一个和死信队列绑定】

      生产者一个

      要点:

      • 在C1中要声明两个交换机和两个队列,因为要让正常队列出现死信立刻转发给死信交换机

      • C2消费者正常写,只负责消费死信队列中的消息【这个逻辑还是不复杂,因为死信队列的消息一过来就被另一个消费者正常消费了】

消息过期

  1. 代码实现

    在控制台可以看见普通队列的Message字段的Ready到消息过期时间会递减,死信队列的Ready会递增,注意这里面似乎还有延迟,即在完全确认死信队列收到消息以前,原队列的消息不会立即删除

    场景:在消费者中声明2个交换机和两个队列,普通队列声明时设置参数绑定死信交换机;开启消费者1创建对应的交换机和队列后关闭消费者1模拟正常消费者宕机,在生产者中设置消息的过期时间,让普通队列中的消息等待足够时间过期自动进入死信队列【为了观察到消息进入死信队列的渐进效果,设置消息每隔1s发送一次】,死信完全进入死信队列后,启动消费者2消费死信队列中的消息

    • 消费者C1

    • 消费者C2

    • 生产者

    • 测试效果

      【生产者发送消息】

      【消息发送到普通队列】

      【消息超时进入死信队列】

      【启动消费者死信被消费】

     

队列达到最大长度

指普通队列达到最大长度后放不下的消息会立即成为死信

 

  1. 验证流程:

    • 通过在消费者1中普通队列声明其他参数补上x-max-length,6设置普通队列的长度仅为6,第二个参数是int类型,

      • 更改队列属性一定要将原队列删掉【可以在声明队列时把autoDelete设置为true,这样就不用每次手动删除队列了,每次断开链接会自动删除】

      • 限制了最大长度的队列会在Features字段显示Lim表示限制了长度【?搜索一下RabbitMQ的队列长度是多少,如何设置】

        限制了长度显示lim,其他参数features会显示args,在args中显示对应的参数

      • 未能进入队列的消息将会成为死信被转发到死信交换机;

    • 把生产者的消息TTL设置成没有过期时间【为了效果明显】,只启动生产者,让消息在队列中积压,观察两个队列的数据数量

  2. 测试

    • 删除正常队列

    • 启动消费者1

      生成被限制了长度的队列后关闭消费者1

    • 启动生产者

    • 测试效果

       

消息被拒

一般队列对应的消费者拒绝对应消息,该消息可以被设置重新放回队列,也可以选择立即成为死信转发到死信交换机

使用消息拒绝必须关闭自动应答,使用手动应答的方式确认消息,自动应答不存在消息拒绝

在queue界面中能点击get message获取当前队列中有哪些消息

  1. 验证流程

    • 将普通队列声明中的自动应答改成手动应答,当消息匹配info3时拒绝该消息,其他消息时手动确认应答

      拒绝消息在消息接收回调中,实际拒绝还是接收到了该消息,只是使用拒绝方法以后让消息重新回到队列或者直接转发到死信队列

    • 删除普通队列并重新生成

    • 生产者发送消息

  2. 测试效果

    • 发送消息

    • 接收消息

    • 死信队列中的消息

      properties中的queue为一般队列表示从一般队列转发来的,这张图确认是在死信队列中截取的

 

 

延迟队列

延迟队列是死信队列的一种,延迟队列就是TTL过期,没有配置死信队列过期消息会被丢弃,配置了死信队列消息会被发送到死信队列,不设置TTL表示消息永远不会过期,

通过死信队列消息过期的演示,生产者将带有有效期限的消息发送给绑定一般消费者的直接交换机,一般消费者宕机,消息等待10s后变成死信,死信被转发给死信交换机,发送给死信队列,死信队列发送给死信消费者,这期间消息从生产者到消费者,中间经历的时间是消息的有效时间10s,那么完全可以让消息变成死信后被消费来实现让消息保持一定的时间后再被消费的需求

核心就是消息超时变成死信+消费者一直消费死信

总结:

  • 两种延迟队列,其中一种是基于死信的,一种是基于插件的;使用RabbitMQ实现延迟队列可以很好的实现RabbitMQ的特性【消息发送和投递的可靠性、死信队列保障消息至少被消费一次以及消息未被正确处理时成为死信不会被丢弃】,通过RabbitMQ集群特性不会让RabbitMQ单个节点挂点导致延时队列不可用或消息丢失

  • 还有其他实现延迟队列的选择,Java中的DelayQueue【消息可能丢失】,Redis的zset,Quartz【定时器】或者Kafka的时间轮,根据特点和场景实现;RabbitMQ更加可靠

  1. 延迟队列的应用场景

    都是设定消息的有效时长实现在某个事件发生之后或者之前指定时长进行处理,这里面的判断条件都在消息消费时判断吗?【好像是】

    数据量较小的情况下,可以使用定时任务每隔几秒查一下条件数据状态,条件成立就执行后续操作【如支付时间一周的账单每晚跑一次定时任务检查一下支付状态】;

    数据量比较大且时效性比较强的场景,如十分钟未支付取消订单,活动期间数据量可能达到百万甚至千万,并发量高,对这么多的数据使用定时任务查数据库状态响应时间慢,数据库压力大,性能低下,还可能耗死服务器

    • 订单十分钟未支付自动取消

    • 新创建的店铺十天内没有上传过商品自动发送消息提醒

    • 用户注册三天内没有登录发送短信提醒

    • 用户发起退款三天内没有得到处理则通知运营人员

    • 预定会议后在预定时间点前十分钟通知相关人员参加会议

  2. 业务逻辑流程示例

    • 用户下订单后会预定座位,订单超30分钟未支付座位重新回票池【这个就是消息队列触发的,将座位添加会坐席数据库中】,订单取消

SpringBoot整合RabbitMQ

建一个SpringBoot工程

  1. pom.xml

  2. application.yml

  3. 启动类

  4. 配置swagger

 

延迟队列实现

整合SpringBoot实现向RabbitMQ发送消息,SpringBoot对RabbitMQ的javaApi进行了封装

  1. 架构图

    三个队列,分别为QA、QB【QA、QB为普通队列】、QD【死信队列】,X为普通交换机,Y为延迟交换机;

    设置两个普通队列的延迟时间分别为10s和40s,不同的业务选择不同的RoutingKey就能够匹配不同的延迟时间

    P发消息,C接收消息

  2. 在原始的RabbitMQ代码中,死信队列绑定在普通队列的其他参数中,整合了SpringBoot后,专门就有一个配置类去配置声明普通交换机,死信交换机...,普通队列、死信队列,不需要消费者或生产者再负责交换机和队列的声明

    声明包括两个交换机和三个队列,两个交换机和3个队列的绑定关系,两个普通队列与死信交换机的转发关系

    • 配置类

      要点:

      1. 交换机、队列和绑定都需要以向Spring容器注入的方式来实现声明和创建,简单的声明只需要使用相应的类传参名字即可,复杂的声明需要使用对应的Builder,如ExchangeBuilder、QueueBuilder和BindingBuilder,这些对象都是org.springframework.amqp.core.包下定义的,用法基本见名知意,按名字设置即可

      2. 队列和死信交换机的关系只需要在队列声明中传参死信交换机的名字和RoutingKey,凡是需要转发到死信交换机的队列都要单独进行传参,参数传递仍然使用map,参数名和原来的相同

    • 生产者

      发送延迟消息的控制器方法

      通过控制器方法实现通过请求的方式使用rabbitTemplate传参交换机,RoutingKey和消息本身实现向消息队列传递消息

    • 消费者

      使用RabbitListener注解指定监听的队列实现对消息的处理,实际肯定是用了反射,接收到队列QD的消息,获取到消息,调用该方法进行对消息的处理

    • 测试效果

      消息发送以后10s和40s收到延迟消息

 

延迟队列优化

上述延迟队列的不足之处

  • 每增加一个新的时间需求,就要增加一个新队列,对于预定会议室这种提前通知的场景,这种设计需要增加无数个队列

  • 延迟时间也可能临时改

为啥不能发送消息的时候指定消息的有效时间,是可以的,这里只是作为讲解加深印象,添加一个没有设置消息有效时间的通用队列,发消息时指定消息的有效时间,通过该队列随意控制消息的延迟时间

  1. 优化架构

  2. 实现

    • 在RabbitMQ配置类中添加配置文件类代码,

      添加不设置消息过期时间的QC,这种有一种很明显的缺点:官网还专门写了一个警告,消息可能已经过期了但是没有到队列头会被困在队列里。直到轮到该消息到队列头才会被转发到死信队列被消费

      重点是生产者如何使用SpringBoot的api发送消息,在convertAndSend方法的第四个参数中设置函数式接口CorrelationData的实现类,设置其中的message.expiration来设置消息的有效时长,传递的是字符串的时间毫秒数

    • 在生产者发送指定延迟时间的消息

      在convertAndSend方法的第四个参数中设置函数式接口CorrelationData的实现类,设置其中的message.expiration来设置消息的有效时长

    • 测试效果

 

延迟队列缺陷

基于死信存在的问题,即消息可能已经过期了但是还没有到队列头会被困在队列里,直到轮到该消息到队列头才会被转发到死信队列被消费

就是不设置消息存活时间的队列,可能存在消息到期了但是不在队列头出不去,直到在其前面的所有消息都过期了才能出队列被消费,无法形成一个通用的延时队列,使用过程中基本上必出现消息过期但是被卡的情况

  1. 缺陷情况演示

    • 向普通队列QC先后发送请求http://localhost:8001/order/custom/你好1/2000http://localhost:8001/order/custom/你好2/2000

      使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

  2. 安装RabbitMQ插件解决延迟队列缺陷

    • 官网下载插件rabbitmq_delayed_message_exchange,放在RabbitMQ的插件目录/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

      这个插件不会实时更新,一直会维持放进去时候的情况

    • 执行命令让插件生效并使用命令systemctl restart rabbitmq-server重启RabbitMQ

      安装不需要写插件的版本号

    • 弄好之后在前端控制台的exchange列表中点击添加交换机多出来一个x-delayed-message类型的交换机,同时也意味着延迟消息不由队列控制,由交换机来控制

       

基于插件的延迟队列

  1. 架构图

  2. 代码实现

    • 配置类

      声明交换机,交换机必须是延迟类型;声明队列;绑定延迟交换机和队列

      • 核心是自定义类型声明延迟交换机,设置参数k=vx-delayed-type=direct,构建交换机传参类型x-delayed-message

      • 构建绑定对象除了传参队列对象,延迟交换机,路由key,还要使用noargs方法构建,注意这个RoutingKey好像是固定的就为delayed.routingkey

    • 生产者

      注意correlationData对延迟交换机设置delay属性

      ?设置延迟是对交换机设置的吗?设置延迟对设置消息在队列中的过期有效果吗

    • 消费者

      正常接收即可

    • 测试效果

      先后发送链接:http://localhost:8001/order/delay/订单消息/20000http://localhost:8001/order/delay/订单消息2/2000

 

发布确认高级

生产环境中由于一些不明原因导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。如何才能进行 RabbitMQ 的消息可靠投递呢?且RabbitMQ 集群不可用的极端情况下,无法投递的消息该如何处理呢?此时生产者的报错信息为队列不可用异常,提示队列不存在或消息队列不可用

消息发送后一直得不到确认就会报异常,不能一直等着回应,把消息丢失,然后就引入了发布确认高级模式,等不到交换机和队列确认应答就叫消息放入缓存,使用定时任务发送缓存消息

发布确认是生产者和交换机之间的事情,消息应答才是交换机、队列和消费者之间的事情

  1. RabbitMQ重启期间的两种消息丢失情况

    生产者不知道消息队列的情况,只管发送消息,消息发送出去找不到交换机或者队列,消息就没了

    • 队列不可用

    • 消息队列整体挂掉,交换机不可用

  2. 解决方案

    • 应该存在一个缓存,当消息经过交换机找不到队列暂时进入缓存,或者消息找不到交换机暂时也进入缓存,通过定时任务对未成功发送的消息重新投递

交换机不可用

以下代码只能针对交换机不可用的情况,对交换机收到消息,但是队列找不到的情况毫无办法

就是通过实现rabbitTemplate的一个回调接口,通过一个标志交换机是否接收到消息来分流对消息的处理

  1. 代码实现

    • 配置类

      一个直接交换机,一个队列,一个绑定关系

      正常绑定

    • 生产者

      情况1:正常模拟消息发送被交换机和队列接收被消费回调函数执行情况

      情况2:把交换机的名字写错模拟找不到交换机的情况,观察回调函数的执行情况和消费者消费情况

      情况3:把RoutingKey写错,让交换机找不着队列,观察观察回调函数的执行情况和消费者消费情况

      回调的消息是发送的时候创建CorrelationData对象,设置消息的id,消息会被自动放入该对象在回调的时候传入,区别于以前的消息发送是不带该参数的重载方法

    • 回调实现类

      核心是交换机【注意这里不涉及队列是否接收到】不管是否接到消息都会回调,用ack标志接收状态来区分回调函数对数据的处理,但是处理不了队列找不到的情况

      回调接口RabbitTemplate.ConfirmCallback的实现类必须通过标注了@PostConstruct的init方法注入rabbitTemplate实例的confirmCallback属性,否则实现类即使注入到Spring容器,消息发送者rabbitTemplate也找不到

    • 消费者

      正常写法消费消息

    • Spring配置类开启发布确认功能

      回调接口的使用还必须在配置文件配置spring.rabbitmq.publisher-confirm-type=correlated

      • 属性值none表示禁用确认发布模式,这也是默认设置;

      • correlated表示发布消息成功到交换器后会触发回调方法;

      • simple有两个效果

        • 其一效果和 CORRELATED 值一样会触发回调方法,

        • 其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法【特制同步确认消息中的单个确认,效率很低,不咋用】,等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到 broker

    • 测试效果

      核心:

      • 情况1:交换机接收消息成功回调进行通知,消息成功被消费

      • 情况2:交换机找不到成功回调对未接收消息进行后续处理【可能缓存起来用定时任务处理】,消息未被消费

      • 情况3:交换机接收消息但是找不到队列,发现回调只是调用交换机成功接收到消息的回调,队列仍然没有接收到消息,消息没有被消费,消息丢失,至此,方案还不完善

 

 

队列不可用

仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息, 如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。 通过消息回退可以在当消息传递过程中不可达目的地时将消息返回给生产者。

  1. 代码实现

    • 消费者

      要点

      • 配置类实现RabbitTemplate.ReturnCallback接口,在returnedMessage中对回退消息进行处理

      • rabbitTemplate.setMandatory(true);配置mandatory属性为true开启回退消息功能,也可以在Spring配置文件配置spring.rabbitmq.publisher.returns=true开启

    • 测试效果

      消息还是原来的消息【生产者、消费者、队列,绑定关系】,注意:

      情况2:交换机找不到没有走该接口,仍然走的RabbitTemplate.ConfirmCallback接口的实现类

      情况3:只有队列没接收到才走的RabbitTemplate.ReturnCallback接口的实现类

 

交换机备份

  1. 项目架构图

    核心思想:消息无法被确认交换机接收自动转发给备份交换机【扇出类型】,备份交换机将消息一方面转发给备份队列进行消息备份,另一方面将消息转发给警告队列进行预警

    当mandatory 参数【消息回退】与备份交换机一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过测试显示答案是备份交换机优先级高【即优先备份交换机,不走回退】

    需要声明:

    • 确认交换机和确认队列【这俩已实现】,备份交换机,确认队列、备份队列、警告队列

    • 正常声明交换机、队列和组件;在确认交换机中使用代码绑定备份交换机为确认交换机的消息转发交换机,用withArgument传递参数

  2. 代码实现

    • 配置类

      注意更改了确认交换机的绑定关系,让其接收不到消息转发到备份交换机

    • 警告消费者

       

    • 测试效果

      仍然是使用消息回退的生产者发送消息,注意删除原来的确认交换机,改了其消息流向

      要点:

      • 情况1:走确认发布机--确认队列--正常消费者

      • 情况2:找不到确认交换机,调用交换机不可用的回调接口实现类让生产者对数据进行处理

      • 情况3:在消息回退和交换机备份同时开启的情况下,不再走消息回退,转而走备份交换机,备份交换机的优先级更高

      【数据保存在备份队列中】

 

其他知识

幂等性问题

重复提交,比如用户购买商品后点击支付,支付扣款成功,但返回结果时网络异常,此时用户多次点击,发生多次扣款并生成多条扣款记录,以往的单系统应用,将数据操作放入事务中,发生错误立刻回滚,但再响应客户端的时候也可能发生网络中断或者异常

幂等性就是为了让用户的同一个操作发起一次或者多次请求的结果是一致的,不会因为多次点击产生副作用

幂等性问题就是消息队列应答ack网络中断导致的消息重复消费的问题

  1. 消息重复消费的可能性

    • 消费者在消费消息返回ack【应答】时网络中断,MQ无法收到应答消息,会把已经消费的消息发给其他消费者重复消费,造成重复消费

  2. 幂等性问题的解决思路

    • 加一个验证消息是否消费过的流程,在消息生成时一同生成一个全局唯一的id,每次消费消息前先对消息进行判断是否消费过

  3. 消费端幂等性问题的保障

    海量订单生成的业务高峰期,生产端可能重复产生消息,通过消费端实现幂等性,让即使收到一样的消息也永远不会被消费多次,

    • 业界主流的幂等性有两种操作

      • 唯一ID+指纹码机制,利用数据库主键去重

        指纹码:按一些规则或时间戳加别的服务拼接出的唯一信息码,利用id查询是否已经处理过,优势是信息拼接简单,信息基本由业务规则拼接而来;劣势是高并发场景下,单个数据库有写入性能瓶颈,可以采用分库分表提升性能,但是不建议

        这个方式不是最佳的,最佳的方式是下一个利用redis的原子性解决

      • 利用redis原子性实现

        用redis的setnx命令,天然就具有幂等性,实现不重复消费

        这个很常用,但是怎么用没说

 

优先级队列

场景:淘宝订单催付功能,客户在天猫下单,淘宝会将订单推送给客户,但当客户没有即时付款,淘宝会给用户发一条短信提示,但是一般能创造很大利润的大商家的订单会先处理,这种大商家会先发短信;后端用redis做消息队列不能实现有优先级的场景,订单量大了以后用RabbitMQ进行改造和优化,发现是大客户就给一个相对较高的优先级,否则就用默认的优先级

RabbitMQ中就有对优先级队列的实现,为每个消息分配一个优先级,每次发送消息前对消息进行优先级排序,优先级大的即便在队列的尾部也是排到队列的前面

  1. 优先级队列原理说明

    • 队列消息正常情况

      /前面是消息,后面是消息的优先级,出队列到消费者对消息根据优先级排序,优先级大的先出队列

    • 排序后的优先级队列

  2. 优先级队列的控制台操作

    企业一般都用代码操作

    • 点击队列--添加队列--Maximum priority--在arguments一栏设置最大优先级【表示只能设置0-设置值之间的优先级,用太大对cpu和内存浪费性能,因为有对优先级的排序】

  3. 优先级队列的代码实现

    注意这种排序是基于队列中有一定数据量情况下的排序,否则发一个就被马上消费,可能观察不到排序的现象,演示为了简单,直接在一堆消息发送完毕的情况下再启动消费者进行消费,实际情况很复杂,因为动态添加数据,出数据的时候又在进数据,很好奇实际是怎么实现的

    实现在first包下

    • 代码中设置队列为优先级队列、设置优先级范围并设置被发送消息的优先级

      要点:

      • 使用params.put("x-max-priority", 10);channel.queueDeclare(QUEUE_NAME, true, false, false, params);设置声明优先级队列

      • 使用AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();把properties设置为消息发送的其他参数设置消息的优先级

    • 消费者

    • 测试效果

      【优先级队列属性标记】

      用Pri标记优先级队列

      【消息发送】

      【消息接收】

      优先级高先接收,默认优先级是0吗?

 

惰性队列

判断惰性队列的标准是消息是保存在内存还是磁盘上,正常情况下消息保存在内存中,惰性队列消息保存在磁盘中,每次接收消息都会把消息写入磁盘,速度很慢,一般不采用惰性队列,只有在大量的消息堆积但是暂时没有消费者,防止大量消息占用内存需要使用队形队列

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列将消息存入磁盘,在消费者消费到相应的消息时才会被加载到内存中,

它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份【下一页待处理的消息】。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中【类比于分页查询】,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。

  1. 队列的两种模式

    • defalut模式

      • 默认是default模式,

    • lazy模式

      • lazy模式是惰性队列的模式,通过方法channel.queueDeclare方法进行设置,也可以通过Policy策略方式设置【通过控制台设置Queue--add a queue--lazy mode】,一个队列同时使用这两种方式设置,Policy的方式具有更高的优先级

      • 声明惰性队列的代码

  2. 惰性队列的性能

    • 内存开销

      在发送 1 百万条消息并积压的情况下,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB

       

RabbitMQ集群

添加其他RabbitMQ服务器,将其加入1号节点服务器就可以形成集群,比如2加入1号,4加入2号和4加入1号效果是一样的,类似于redis集群

  1. 集群架构

    添加两台新机器,都加入RabbitMQ节点1号

  2. 集群搭建实操

    • 将当前机器克隆三份并修改三台机器的ip地址,不要使其冲突【电脑好,扛得住】,使用xshell对三台机器进行远程连接

    • 使用命令vim /etc/hostname修改3台机器的主机名称为目标名称node1、node2、node3并使用命令shutdown -r now重启机器,使用命令hostname查看当前机器的主机名

    • 使用命令vim /etc/hosts添加各机器节点的ip和hostname配置各个虚拟机节点并重启机器,让各个节点能识别对方

    • 要确保各个节点的cookie文件使用的是同一个值,在node1节点上执行远程操作命令scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookiescp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie将第一台机器的cookie复制给第二台和第三台机器

    • 三台机器使用命令rabbitmq-server -detached重启RabbitMQ服务、顺带重启Erlang虚拟机和RabbitMQ的应用服务

    • 以node1为集群将node2和node3加入进去,分别在node2和node3节点执行以下命令

      关闭RabbitMQ服务,将rabbitmq重置,将node2和node3节点分别加入node1节点【这里将node2节点加入node3节点观察后续移除node2节点后node3的效果,凉了手速过快,一起连上了】

      执行命令rabbitmqctl join_cluster rabbit@node1必须开放node1的4369和25672端口,否则会报错;网上一堆操作猛如虎,没一个讲到点上的;克隆的系统相关端口也是开放的

      我靠,血泪教训,最多只能有一个机器不开放4369和25672端口,其他所有机器都必须开放这俩接口,否则严重点会直接导致所有的RabbitMQ没有一台机器能启动,一直显示正在启动中,启动命令一直卡在运行中,其他的rabbitmq命令报错消息还很傻逼,只会提醒应用没启动,网上还没啥解决方案【fuck】,最后只启动node1发现突然能启动,且能进后台,然后启动node2突然能启动了,node3死活启动不了,终于开放node2的两个端口后node3就能自动启动了,为了方便以后不出问题,建议所有机器节点都开放这俩端口,连带5672端口和15672端口

      【没开放端口的情况】

      【开放4369端口的情况和开放了25672端口的情况】

    • 使用命令rabbitmqctl cluster_status查看集群状态

      2号节点一直在启动,不知道为啥

    • 只需要在一台机器上使用以下命令重新设置用户

  3. 搭建成功标志

    • 进入网页服务界面能看到3个RabbitMQ节点【状态都是绿色就表示非常健康】

  4. 解除集群节点的命令【node2和node3分别执行以脱离,最后测试一下2号机脱离通过2号机联机集群的3号机的状态,手快了全绑在node1下了】

    【脱离机器node2或node3分别执行】

    【node1执行命令忘记脱离的节点】

     

镜像队列

目前每个节点上的队列不可复用,某个节点突然宕机,队列会直接不可用,队列中的消息会丢失,即使是持久化的消息也会存在在持久化的过程中时间不够消息丢失

  1. 节点队列不可复用演示

    • 在node1上创建hello队列,使用命令rabbitmqctl stop_app关闭node1服务,观察控制台node1队列的状态

      【node1关闭】

      【关闭后队列情况】

      和课堂演示不同,压根连队列都直接不显示了,那是因为队列没有持久化

      【持久化以后】

      NaN表示不是一个数字,非法值

      【使用其他节点访问该队列会报错并提示队列down了】

    • 重启以后队列以后发现队列中的消息没了

      也没有被消费

       

  2. 镜像队列

    镜像队列就是对其他节点队列的备份,引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。

    可以将节点队列在其他节点上备份一份,也可以每个节点上都备份一份,但是要根据情况,如果全都备份会很浪费资源,这样不好

    • 镜像队列备份策略搭建【通过控制台】

      在admin菜单下users下点击Policies--添加新的策略,表示给/添加策略

      name是随便起的,只是表示这个镜像对列的名字

      pattern是镜像匹配的队列,是一个正则表达式,^mirrior表示给以mirrior为前缀的队列或交换机整个镜像【名为hello的队列就不能被镜像,mirrior_hello这个队列就可以】

      Apply to表示应用于交换机和队列

      Definition表示设置一些参数

      • ha-mode:HA 全称high available 高可用,mode表示高可用的备份模式,exactly表示指定具体备份几份

      • ha-params:表示指定备份的具体份数,这里表示备份两份【这个两份包括主机在内一共两份】

      • ha-sync-mode:表示自动同步数据,自动设置为automatic,表示自动进行同步,也可以设置成手动,但是手动同步比较麻烦

    • 备份效果

      • admin中显示当前的备份策略

      • 此时在node1创建匹配策略名字的队列,会再多备份1份,备份的一份队列可能在node2节点,也可能在node3节点上,具体由服务器决定【有备份的队列会在Node上显示+1,即额外备份的数量】

        点进具体的队列会在mirrors显示具体备份的节点

        【队列详情】

      • 关闭节点1,备份的镜像队列会自动在Node属性栏显示正在node3节点运行,同时还会再备份一份在其他节点node2上【牛皮】

        没有备份的队列都噶了【down】

        能够达到就算整个集群只剩一台机器也能处理之前宕机的节点中的队列和数据,宕机会自动再备份到其他服务器上

        【再次备份】

        一台宕机以后,在其他节点上再次备份一份维持备份策略要求的2份策略

      • 启动消费者

        发现消息仍然被消费了

        注意,这时候消费者对应的节点地址也必须跟着变才能接收到消息,使用宕机节点的地址仍然会报错消息队列不可用【经过测试确实如此】

        不足:没有介绍消费者针对集群的连接设置,因为消费者要自己判断机器是否宕机和切换节点地址,生产者此时也有相同的问题【发送消息】,写死了ip;这已经不能由RabbitMQ自己解决,需要借助软件Haproxy实现负载均衡,Twitter、Reddit、StackOverflow、GitHub等都在用,类似与这种负载均衡软件还有nginx、lvs,软件区别:http://www.ha97.com/5646.html

         

         

Haproxy实现负载均衡

HAProxy 提供高可用性、负载均衡及基于 TCPHTTP 应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案,包括 Twitter,Reddit,StackOverflow,GitHub 在内的多家知名互联网公司在使用。HAProxy 实现了一种事件驱动、单一进程模型,此模型支持非常大的井发连接数。

高可用:某个机器宕机了,有备机接替他的工作,系统能正常运行

  1. 整体架构

    • 生产者发送的消息通过ip找到haproxy主机【有haproxy备机】,由主机负责转发消息到不同的消息队列节点,只需要将消息发送到http://10.211.55.71:8888/stats【具体看文件或者博客】

    • 主机宕机会被keepalive软件发现将ip漂移到备机上,备机再负责消息向消息队列转发【备机也会定期监测主机是否还还活着,收不到主机的消息就会自动启动】

    • keepalive主要是为了高可用,能够通过自身健康检查、资源接管功能做高可用(双机热备),实现故障转移.

    • haproxy+keepalive方案就能解决rabbitmq集群的转发和负载均衡问题 【具体操作看文档或者博客,没讲】

     

Federation Exchange

联合交换机,相距很远的机房之间存在网络延迟,消息队列可能被设置在相隔很远的机房,机房周边区域使用特定的机房来访问降低延迟,但是存在数据不一致的问题,使用联合交换机解决数据不一致的问题

这部分讲的太水了,有相同的应用场景再回来看

  1. 搭建步骤

    • 在每台机器上开启federation相关插件【自带的】

      • 使用命令rabbitmq-plugins enable rabbitmq_federation开启对应插件

      • 使用命令rabbitmq-plugins enable rabbitmq_federation_management

    • 安装好以后控制台admin菜单能看见多出来Federation StatusFederation Upstreams菜单【一般是一个机器固定同步数据给另一台机器】

  2. 联合交换机原理

    • node1理解为北京,node2理解为深圳;1号节点算上游【数据由上游同步到下游,水流类比】,2号节点算下游;

      1号节点的交换机【在上游配置2号节点地址】要配置2号节点的地址,1号节点的交换机在同步数据之前2号节点交换机必须有和对应1号节点交换机同名的交换机,没有会报错

      文档又说下游配置上游节点地址,服了

  3. 步骤演示

    • 在node2上创建node1上需要同步数据的交换机fed_exchange,绑定node2上的队列

    • 在客户端配置上游地址

      注意是在下游节点配置上游节点的地址,在node2配置node1的地址,把node1的数据同步到下游node2

      【设置效果】

    • 设置策略

      【设置效果】

      这个只是表示node2能不能连接上node1

 

Federation Queue

联邦队列,联邦交换机和联邦队列都可以实现两地间数据的交换

一个联邦队列可以连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息 的需求。

联邦队列和联邦交换机都没有演示效果

  1. 原理

    node1的fed.queue想将数据同步到node2进行消费【不影响node1对同步数据的消费,应用场景就是深圳对北京的数据同步】,需要先将node2的fed.queue队列联合到node1的fed.queue队列

    node2配置node1的地址已经在联和交换机中配置好了,这里只需要配置策略

     

  2. 步骤演示

    • 在node2创建对应node1的fed.queue

    • 在node2配置node1的地址【同上面的联合交换机,upstream表示上游的意思】

    • 添加联合策略

      【联合策略】

      一个是联合交换机策略,一个是联合队列策略,上游都是node1

       

       

 

Shovel

还是做数据备份或者转发的,和联合交换机、队列的作用差不多,将一个节点的数据【作为源端】拉取转发到另一个节点【目的端】

shovel可以翻译为铲子

  1. 架构图

    • Q1是源端、Q2是目的端;

      发送数据给Q1,Q1会将数据同步到Q2中

  1. 搭建

    • 像federation一样shovel安装插件并在控制台可以看见shovel status和shovel upstream

    • 配置shovel策略

      配置以后,node1节点的Q1中的消息都会同步到node2节点的Q2,解决跨地区数据同步的问题

      name是自定义的,和联合队列是一样的,最好见名知意

      【配置状态】

RabbitMQ项目应用



  1. 概念介绍

    • Publisher消息生产者:是向交换器发布消息的客户端应用程序

    • Message消息:消息由消息头和消息体构成,

      • 消息头中有很多的配置项

        • route-key路由键:

        • priority:相较于其他消息的优先权

        • delivery-mode:配置消息是否需要持久性存储

      • 消息体中是真正的消息内容

    • Broker消息服务器:也就是上面说的消息代理

    • Exchange交换器:这个交换器用来接收生产者传递过来的消息并将这些消息路由给服务器中的各种队列,交换器有4种类型,不同交换机转发消息的策略有区别,这个类似于网络交换机,一个消息服务器中可能存在多个不同类型的交换器,交换器接收生产者传递来的消息并按照既定策略将消息路由到指定的队列中,一个消息代理中可能含有多个消息队列,消息队列和交换器之间有预设的绑定关系Binding

      • Direct[默认]:直接交换器

        • 其中Direct和Headers都是JMS中点对点通信模型的实现,Headers匹配AMQP消息的消息头而不是路由键,Headers交换器和Direct交换器除了匹配路由不同其他都完全一致,但是Headers的性能比较低下,一般都不讨论,也几乎不用;主要讨论Direct、Fanout和Topic

        • Direct Exchange直接交换器:该交换器将消息交给一个指定的队列,消息中的路由键routing key只能唯一匹配与Bingdingbingding key完全相同的队列,比如一个路由键为dog的消息只会被直接交换机路由到绑定关系中binding keydog的队列,核心是routing key和绑定关系中的binding key一模一样才能进行匹配,这也叫完全匹配、单播模式,也称为点对点模式

      • Fanout:扇出交换器

        • Fanout和Topic都是发布订阅模式的实现,这种是广播模式的实现,无条件将消息发给所有与交换器绑定的队列

        • Fanout Exchange扇出交换器:这种交换器根本不关心交换器的路由键是什么,一个扇出交换器可以绑定多个队列,每个发送到扇出交换器的消息都会被广播到与扇出交换器绑定的所有队列上,很像子网广播,每台子网内的主机都会获得一份复制的消息,Fanout交换器转发消息是最快的

        • 给扇出交换器发送的消息不指定routing key也是可以的,所有和扇出交换器绑定的队列都能接收到消息;指定了路由键也不会对路由键进行判断处理

      • Topic:主题交换器,对应发布订阅模式,主题交换器对应的是根据路由键将消息路由到模式匹配的一个或者多个与交换器绑定的队列

        • 一个主题交换器绑定多个队列,每个Bingding中都有一个模式bingding key,这个模式由两个通配符#*以及单词和点构成,其中通配符#匹配0个或者多个单词,注意不能使用#匹配字母;通配符*匹配一个单词即被匹配的路由键对应位置必须有一个单词,单词之间使用点进行分隔,只有路由键匹配对应绑定关系的bingding key消息才会被转发到对应的一个或者多个队列上

        • 比如bingding key=usa.#即匹配rounting key以单词usa开头的,bingding key=#.news匹配routing keynews作为后缀的

      • Headers:

         

    • Queue队列:是消息的容器,用于保存消息直到消费者连接该队列将该消息取走,一个消息可以投入一个或者多个队列

    • Binding绑定:用户关联消息队列和交换器,绑定是基于路由键将交换器和消息队列关联起来的路由规则,可以将交换器理解成一个由绑定构成的路由表,交换器和队列的绑定关系是多对多关系

    • Connection连接:每个客户端都只会和消息中间件建立一条长连接来收发消息,长连接就是一直保持连接状态的连接,连接类型是TCP连接,消费者可以通过该一条连接同时接收来自多个队列的消息

    • Consumer消费者:从消息队列中取得消息的客户端应用程序

    • Channel信道:Java的NIO中也有信道的概念,信道是多路复用连接中的一条独立的双向数据流通道,信道是建立在真实的TCP连接内的虚拟连接,AMQP命令、发布消息、订阅队列或者接收消息都是通过信道完成的,通过在一条长连接上开辟多条信道,每条信道负责各自的收发消息通信,接收消息是使用信道来接收指定队列的消息,对于操作系统来说建立和销毁TCP连接都是非常昂贵的开销,通过信道来实现对一条TCP连接的复用

      • 同时通过长连接,一旦消费者宕机导致连接中断,我们的消息代理能够实时感知到消费者下线,消息无法被消费者获取,就会立即将消息存储起来不再向外派发,不会造成消息大面积丢失;如果消息代理不能实时识别消费者的连接状态,消费者宕机的情况下仍然将消息发送给消费者并删除对应消息,消息就丢失了

    • Virtual Host虚拟主机:虚拟主机标识一批交换器、消息队列和相关对象,虚拟主机的作用是将多个交换器、多个队列作为一个整体和其他的虚拟主机隔离开,避免一个虚拟主机由于一套系统的突发情况导致消息队列中间件崩溃同时影响到使用另一套虚拟主机的其他系统

      • 虚拟主机以路径作为标识,不同的虚拟主机位于同一个消息服务器即Broker中,不同的虚拟主机相互隔离,在使用上就像在机器上安装了另外一台RabbitMQ服务器

 

Docker安装RabbitMQ



 

  1. 安装步骤

    • 使用命令docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management安装RabbitMQ,其中

      • 436925672端口是Erlang发现端口和集群端口,RabbitMQ是用Erlang语言编写的

      • 56725671是AMQP端口

      • 15672是web管理后台端口

      • 6161361614是STOMP协议端口,开启了STOMP协议才需要开放该端口

      • 18838883是MQTT协议端口,开启了MQTT协议才需要开放该端口

      • RabbitMQ的RabbitMQ官方文档对这些端口有具体说明

    • 使用命令docker update rabbitmq --restart=always让容器随着Docker启动自动启动

    • 使用命令docker exec -it rabbitmq /bin/bash进入容器rabbitmq的bash命令控制台,无需停止rabbitmq直接使用命令rabbitmq-plugins enable rabbitmq_management,也无需重启容器

      • 后台管理系统http://192.168.56.10:15672的默认账号和登录密码都是guest,如果不执行上述步骤只能打开登录页无法登录进入首页

      • 如何在docker内部使用命令停止rabbitmq没学过,使用原生rpm安装的的Rabbitmq命令提示找不到对应服务

WEB界面简介

  1. 界面简介

    • Overview:RabbitMQ服务器的运行状况概览,Web管理界面的数据每5秒刷新一次,默认访问的是所有虚拟主机,有一个默认的虚拟主机/

      • totals:包含Queued messages[消息队列中的消息]Currently idle[当前消息服务器中的空闲信息]Message rates[消息的收发速率]Global counts[监控的全局属性,包括有多少条连接,多少个信道、多少交换器、多少个队列以及多少个消费者]

      • Node:列举RabbitMQ的节点信息,因为当前不是集群,因此只列举了一个节点,展示了节点的内存、磁盘空间占用

      • Churn statics:以图表的形式列举静态统计数据,每秒有多少个链接、多少个信道、多少个队列

      • Ports and contexts:展示RabbitMQ的监听端口[比如客户端使用高级消息队列协议连接消息代理收发消息就要使用AMQP协议的通信端口5672、集群端口25672和Web端的端口15672,注意Web上下文的端口也是15672,绑定了IP地址0.0.0.0[意思是所有人都可以访问该Web端口]

      • Export definitions:可以做老RabbitMQ服务器配置迁移,比如我们新装的RabbitMQ想使用已有配置可以直接在该选项中下载消息服务器配置下载相应配置文件

      • Import definitions:可以在该选项卡中通过上传从别的RabbitMQ集群中下载的配置文件,一键将已有配置应用到新的RabbitMQ集群中来让多个集群或者机器保持相同的配置

    • Connections:该选项卡监控当前RabbitMQ服务器有多少个客户端和服务器建立了连接,注意一个客户端只会和一个服务器建立一个连接

    • Channels:一条连接会有多条信道,所有的信道都会在该选项卡中展示出来

    • Exchanges:该选项卡会列举RabbitMQ中所有的交换器,默认的交换器有7个,并展示所有交换器的名字、类型、交换器特性、消息进出交换器的速率

      • Add a new exchange:我们可以通过该选项卡创建新的自定义交换器

      • Publish message:Web客户端实际上就是一个消息队列客户端,我们可以在web界面该选项卡直接发消息,Routing key是消息的路由键,Payload是消息具体内容

    • Queues:该选项卡列举RabbitMQ中的所有队列,Ready是队列中准备被消费的消息数量、Unacked是队列中还没有收到消费者消息确认的消息数量

      • 队列列表中的Features字段表示队列的配置信息,DDurable表示当前队列是持久化的,DLX表示当前队列设置了死信交换器,DLK表示当前队列的死信消息被设置了路由键,Args是当前队列被设置的其他参数比如x-message-ttlTTL表示当前队列设置了消息的存活时间

      • Add a new queue:我们可以通过该选项卡创建自定义的队列

      • 点进具体的队列使用选项卡Get Message能使用Web客户端获取指定队列中的消息

        • Ack Mode:回复模式,选择Nack message requeue true是当前Web客户端拿到消息后不告诉服务器自己拿到消息了,RabbitMQ会将消息重新入队列,其中Nack表示收到消息不回复,message requeue true表示开启消息重新入队列

    • Admin:这是RabbitMQ的管理设置功能,我们可以通过选择右侧的选项卡在这里设置用户信息、虚拟主机信息[显示虚拟主机的消息、客户端、消息的获取派发速率等数据]、特性标识、配置策略、Limits[对虚拟主机的连接限制,可以设置RabbitMQ的最大连接数和最大队列数]Cluster展示和配置集群信息

      • Add a user:我们可以通过该选项卡添加新的用户

      • Add a new virtual host:虚拟主机是通过路径来区分的,我们可以通过该选项卡来添加新的虚拟主机,通过点击虚拟主机的名字我们还可以对虚拟主机进行更细致的配置[比如删除虚拟主机]

    • 底部的选项卡会列举RabbitMQ的一些官方文档和不太重要的信息,我们一般使用该界面来管理RabbitMQ中的交换器和队列

 

基本用法



  1. RabbitMQ的运行机制

    • 一个交换器可能和多个队列都有绑定关系,一个队列也可以被多个交换器绑定;生产者将消息发布到交换器上,交换器根据绑定关系和消息的路由键决定将消息发送到指定的队列上,整个过程就是消息路由的过程

    • 注意消息是发送给交换器,监听消息是监听交换器

  2. 默认交换器

    • RabbitMQ默认有七个交换器,其中两个直接交换器,一个扇出交换器、两个Headers交换器和两个主题交换器

  3. 创建交换器

    • 创建交换器指定交换器的名字,交换器的类型、交换机是否持久化或者设置为临时,持久化的交换器在RabbitMQ服务器重启以后仍然存在,但是临时交换器只要RabbitMQ一重启就没了,

    • 自动删除设置为YES当交换器没有任何队列绑定在该交换器上该交换器就会自动删除

    • Internal设置为yes即表示当前交换器为内部交换器,客户端不能给该交换器转发消息,内部交换器只是供RabbitMQ内部转发路由使用的

    • 一般自动删除和内部交换器都设置为默认的No

    • 通过交换器列表的名字点进交换器我们可以查看交换器更详细的绑定信息、消息发布信息,设置交换器的绑定关系

      • 交换器可以和交换器进行绑定,交换器也可以和队列进行绑定,通过这种机制可以实现交换器绑定交换器再绑定到队列,实现多层路由

      • 配置绑定关系指定的routing key就是上面说的Binding中的binding key

  4. 创建队列

    • 创建队列,指定队列名字、指定队列是否持久化

    • 如果队列自动删除设定为yes,只要没有消费者连接监听该队列,队列就会自动删除

  5. 将交换器与队列进行绑定并指定binding key

  6. 向交换器发送消息

     

延时队列



  1. 场景

    • 📜:下订单如果三十分钟以后没有支付就关单,锁定库存成以后四十分钟如果订单没有创建成功或者订单被取消就释放被锁定的库存

      • 💡:方案一是系统使用定时任务每隔1分钟就去扫描数据库检查哪些订单还没有支付,如果其中有订单到期了就将订单删除;锁定库存四十分钟仍然有锁库存记录且订单没有被支付或者订单没有被创建就解锁库存

        • 缺点:定时任务消耗系统内存,每隔一段时间就要全盘扫描一次增加数据库压力,定时任务最大的问题是有较大的时间误差,即我们开启定时任务的根据不是以每个业务作为起点的,而是以每个服务的某个系统时间作为起点的,但是业务的创建时间是随机的,我们只能通过逻辑判断业务是否在定时任务时刻满足到期条件,这不可避免地会导致业务实际到期时间出现偏差,偏差越小我们的定时任务就越频繁,定时任务对系统内存和数据库的压力就越大

      • 💡:方案二是使用RabbitMQ的延时队列,延时队列是结合消息的存活时间TTL和死信路由Exchange来结合实现的,我们创建订单成功可以给延时队列中存放一条消息,消息到达指定时间后被转发给监听队列的服务,即延时队列的消息最大的特点是消息在指定时间后才能被消费者接收到;锁顶库存成功了我们就给另一个延时时间40分钟的延时队列也发送一条锁定库存成功的消息,延时时间到了以后再给库存服务发送消息,库存服务拿到消息检查订单如果没有支付或者订单压根没有成功创建就去解锁被锁定的库存

        • 延时队列实现的定时任务能解决系统定时任务带来的大量业务的时效性问题,延时队列的时效性只会因为网络波动重试等差上几秒钟,但是系统定时任务不仅占用系统和数据库资源,还会存在巨大的业务时效性问题

  2. 延时队列

    • 消息的TTL[Time To Live]:消息的存活时间,RabbitMQ可以给队列和消息都分别设置存活时间,不论给队列还是消息设置存活时间,存活时间的含义都是从消息进入队列开始到达存活时间消息仍然没有被消费者消费,消息就会变成死信,RabbitMQ服务器会默认将死信直接丢弃

      • 对队列设置TTL是没有消费者连接时消息在队列中的最大保留时间

      • 如果队列设置了TTL、同时消息也设置了TTL,会选取两者中小的TTL作为当前消息的TTL,这也意味着如果一个消息被路由到不同的队列中,这些消息的存活时间可能不会相同

      • 消息的存活时间设置:通过设置消息的expiration字段或者x-message-ttl属性来设置消息的TTL,两种设置方式的效果是相同的

    • 死信

      • 一个消息满足以下条件就会进入一个死信路由,这个死信路由可以对应很多队列

        • 消息被消费者拒收,并且手动消息确认时有一个reject方法中的重新入队参数requeuefalse,即消费者收到消息但是拒签消息而且标记了不让消息重新入队列

        • 消息的存活时间到了,消息过期

        • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上

      • 死信交换器Dead letter Exchange就是一种普通的交换器,只是一个队列设置了死信交换器,一旦消息过期就会自动触发消息转发到死信交换器中

    • 延时队列的实现是设置一个队列中的消息存活时间为指定值,队列不能让任何消费者监听,让队列在消息有效时间内一直保存消息,消息一过期,让消息进入死信交换器,死信交换器再将消息路由到绑定了指定消费者的队列直接将消息转发给消费者,相当于在正常的消息转发路径上添加了一个没有消费者监听的队列来在指定时间内等待消息自动失效被死信队列转发

      • 延时队列实现1:

        • 消息生产者以deal.message作为路由键发送消息给交换器x将消息路由到延迟队列delay Sm queue,该队列相较于普通队列多了三项设置[队列消息存活时间x-message-ttl:300000,单位是毫秒;设置死信交换器x-dead-letter-exchange:delay exchange为交换器delay exchange,即队列中的消息过期了自动转发给死信交换器delay exchange;设置死信转发给死信交换器的路由键x-dead-letter-rounting-key:delay messagedelay message,死信交换器会根据该路由键将死信转发到对应绑定键的队列test queue中,并将消息发送给消费者]

      • 延时队列实现2:

        • 这个实现其实就是将上面给队列设置消息过期时间改成了单独给每个消息设置过期时间,消息生产者发送消息是给消息的expiration字段设置expiration:300000,将延迟队列delay Sm queue设置死信交换器为delay exchange,将路由键设置为delay.message;消息过期以后将消息的路由键设置为delay.message并转发给死信交换器delay exchange,死信交换器根据路由键和绑定键将消息转发给消息队列test queue,消息队列将消息发送给消费者

      • 一般我们会采用给队列设置消息过期即方案1的方式,因为RabbitMQ采用的是惰性检查机制,也叫作懒检查,懒检查就是RabbitMQ只会在队列头消息过期的时间点来检查头节点的有效时间是否过期,过期了就将该消息作为死信;此时才会检查下一个消息是否过期,如果下一个消息早就过期了才会将消息设置为死信,但是给整个队列设置同一个过期时间就不会出现这种问题,因为是以消息到达队列时开始计算相同的过期时间,即消息头的节点没过期后续的节点永远不会过期

      • 延时队列实现3:

        • 在实现1的基础上我们可以简化为如下实现,即将两个交换器合并为一个交换器,根据消息前后的路由键不同由一个交换器将同一条消息分别路由到两个不同的消息队列中

    • 以延时队列3为例通过向Spring容器中注入组件的方式来创建延时队列

      • 一个交换器绑定多个队列使用路由键模糊匹配一般都使用主题交换器

  3. 测试延时队列

    [消息生产者]

    [消息消费者]

    • 注意因为此前我们在订单服务的配置文件中使用了配置spring.rabbitmq.listener.simple.acknowledge-mode=manual开启了消息消费者接收消息手动确认模式,因此这里我们获取到信息以后一定要拿到信道通过信道手动应答

    • 该延时队列的效果是发送消息一分钟后消费者收到消息

     

 

 

 

SpringBoot整合RabbitMQ



 

  1. 引入依赖

  2. 开启使用RabbitMQ的功能

    • 在配置类上使用注解@EnableRabbit开启RabbitMQ的相关功能

  3. 配置RabbitMQ服务器信息

     

自动配置原理
  1. 自动配置原理

    • 引入amqp场景启动器自动配置类RabbitAutoConfiguration会自动生效,该自动配置类会自动给容器注入组件CachingConnectionFactoryRabbitTemplateAmqpAdminRabbitMessagingTemplate

    [RabbitAutoConfiguration]

    [RabbitProperties]

    • 所有关于RabbitMQ的配置都以spring.rabbitmq作为前缀

     

AmqpAdmin


API
  1. void ---> amqpAdmin.declareExchange(Exchange exchange)

    • 功能解析:在RabbitMQ服务器中创建一个交换器

    • 使用示例

      • 示例含义:在RabbitMQ服务器中创建一个名为mall-direct-exchange的直接交换器

    • 补充说明

      • 交换器Exchange是一个接口,有一个抽象子类AbstractExchange,该抽象子类有五个子实现类,分别为下列所示,通过这五个子实现类来创建对应类型的交换器

        • DirectExchange:直接交换器

          • 全参构造为public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments),分别表示交换器的名字、交换器是否设置为持久化、交换器是否自动删除以及为交换器指定键值对形式的参数,如果无需指定参数可以使用不带该参数的重载构造方法,默认也是创建的持久化交换器和非自动删除的交换器

        • HeadersExchange:Headers交换器

        • FanoutExchange:扇出交换器

        • TopicExchange:主题交换器

        • CustomExchange:自定义交换器

  2. void ---> amqpAdmin.declareQueue(Queue queue)

    • 功能解析:在RabbitMQ服务器中创建一个队列

    • 使用示例

          • 示例含义:在RabbitMQ服务器中创建一个名为mall-hello-queue的队列

        • 补充说明

          • 队列Queue只是一个类,不是接口也没有子类,我们直接通过实例化Queue对象就能声明一个队列,注意这个Queue不是java.util包下的,是org.springframework.amqp.core包下的

          • 队列全参构造public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments),参数分别为队列的名字、是否持久化、是否排他[排他是指该队列只能被一条连接独占,只要有一条连接连上了该队列,其他连接都连不上该队列,实际开发队列不应该是排他的,我们更希望多个客户端来连接同一条队列,只是最终只有一个客户端获取到消息],是否自动删除,为队列配置一些参数,如果不需要指定参数可以使用不带该参数的重载构造方法,注意这里的参数是队列的相关配置,参数示例列举如下

            • 队列中消息的最大存活时间,

            • 队列的死信交换器,

            • 死信消息的路由键等

      1. void ---> amqpAdmin.declareBinding(Binding binding)

        • 功能解析:在RabbitMQ服务器中创建一个绑定关系

        • 使用示例

          • 示例含义:在交换器mall-direct-exchange和队列mall-hello-queue之间创建一个绑定关系

        • 补充说明

          • Binding也是一个类,没有子类

          • Binding的全参构造public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,Map<String, Object> arguments)

            • 参数destination是目的地名字[这个目的地可以是队列名字也可以是交换器名字]

            • DestinationType是目的地类型[目的地类型可以是交换器也可以是队列,这个DestinationType是一个枚举]

            • exchange是我们要进行绑定的交换器名字

            • RountingKey就是绑定关系中对应的Binding key要匹配消息的RoutingKey

            • 注意Binding的构造不传参自定义参数必须要指定为null,没有对应不含该参数的构造

       

       

RabbitTemplate


API
  1. void ---> rabbitTemplate.convertAndSend(String exchange,String routingKey,Object object)

    • 功能解析:该方法将我们传入的object对象转换成字节流数据发送给RabbitMQ服务器中指定的交换器

    • 使用示例

      • 示例含义:在RabbitMQ服务器中创建一个名为mall-direct-exchange的直接交换器

    • 补充说明

      • rabbitTemplate有原生的send方法也可以发送消息,但是该方法需要传参被封装成Message类型的消息

      • 参数exchange是交换机的名字,参数rounting key是消息的路由键,参数object是消息本身

      • RabbitMQ队列中存储的消息的内容是被编码过的,默认的消息类型是application/x-java-serialized-object即默认是使用的Java序列化器来进行的编码,这要求作为消息发送的对象对应的类必须实现了序列化接口Serializable

      • 序列化实际上是amqp包下的MessageConverter在起作用,MessageConverter是一个接口,在抽象类AbstractMessageConverter中有一个子实现类AbstractJackson2JsonMessageConverter[注意Jackson2Json意思是通过Jackson转成json,2是to的谐音],注意AbstractMessageConverter中还有一个子实现类WhiteListDeserializingMessageConverter,默认配置的SimpleMessageConverterWhiteListDeserializingMessageConverter的一个子类,要自定义序列化机制就要给容器注入一个MessageConverter组件,我们想将消息序列化成一个json对象就可以通过向容器注入一个AbstractJackson2JsonMessageConverter来实现

      • 这个感觉设计的很糟糕,该方法不返回任何值,发送消息失败比如没有创建绑定关系消息无法入队列不会报错没有返回值也不会抛异常,出了问题都不知道

 

定制消息序列化器


  1. 原理

    • RabbitTemplate中的messageConverter就是给消息对象做序列化的,在自动配置类RabbitAutoConfiguration中会优先从容器中获取MessageConverter类型的组件作为RabbitTemplate的MessageConverter,

    • 如果容器中没有就会默认自己实例化一个SimpleMessageConverter对象来做序列化,消息转换器SimpleMessageConverter会对消息进行判断,如果消息对象是String类型,直接获取String类型的bytes数组,如果不是String类型且实现了Serializable接口就使用序列化工具SerializationUtils的serialize(object)将对象转换成bytes数组,即序列化实际上是amqp包下的MessageConverter在起作用,

    • MessageConverter是一个接口,在抽象类AbstractMessageConverter中有一个子实现类AbstractJackson2JsonMessageConverter[注意Jackson2Json意思是通过Jackson转成json,2是to的谐音],注意AbstractMessageConverter中还有一个子实现类WhiteListDeserializingMessageConverter,默认配置的SimpleMessageConverterWhiteListDeserializingMessageConverter的一个子类,

    • 要自定义序列化机制就要给容器注入一个MessageConverter组件,我们想将消息序列化成一个json对象就可以通过向容器注入一个AbstractJackson2JsonMessageConverter来实现

  2. 配置步骤

    • 向容器中注入AbstractJackson2JsonMessageConverter来替代默认的SimpleMessageConverter来将消息对象序列化成json对象

      • 注意使用AbstractJackson2JsonMessageConverter,在web管理端界面我们获取到消息后能观察到消息的content_type由原来的application/x-java-serialized-object变成了application/json

      • 而且在消息头中还有一个_TypeId_字段,记录者消息的全限定类名

       

     

@RabbitListener


  1. 监听消息并获取消息头和消息体

    • Body就是消息本身

    • messageProperties是消息属性,就是消息头中的属性值[即消息类型ID、消息内容类型等等]

    • 这里我们用Object接受消息,实际上message的真正类型是org.springframework.amqp.core.Message,因此我们直接将Object类型改成Message类型

      • 我们可以通过byte[] body=message.getBody()获取消息体的内容,通过MessageProperties properties=message.getMessageProperties()获取消息头的属性

    • 这种方式获取的消息体实际上是一个字节数组,我们要将其转换成指定对象需要使用FastJson这样的将JSON对象转换成实体类的解析工具

  2. 监听消息并将消息体自动转换成消息对应类型

    • 我们还可以参数列表通过指定消息的实际类型,让Spring自动将消息体转换为对应的类型,因为消息头中保存了消息体的全限定类名,但是这个转换方法很有意思,应该不是使用的fastjson,我们对fastjson很熟悉,但是对这个转换方法不太清楚

  3. 监听消息并获取信道

    • 获取当前传输数据的信道,每个客户端只会与RabbitMQ建立一个连接,但是可以在一个连接内创建多个信道,该信道一般用在可靠性投递场景中

  4. 注意

    • 一个队列可以被很多个客户端监听,但是最后只有一个客户端能收到消息,只要有一个客户端收到消息队列就会删除消息,而且还会保证只能有一个客户端成功获取该消息

    • 如果上述代码因为多个服务实例以上代码同时在三个服务实例中生效三个服务实例中的上述代码同时监听一个队列中的消息,最后也只会有一个服务实例成功获取到消息,弹幕说这个过程还可以应用负载均衡策略

    • 卧槽,单元测试相当于新开一个服务实例,如果我们使用单元测试发送消息,消息发送出去单元测试的服务实例还没来得及销毁也会在期间监听并获取到消息

    • 一个服务实例的一个监听消息方法获取消息的过程是加了锁的,只有当前服务实例获取到消息并完成执行完被标注方法才会释放锁,当前服务实例才能获取监听队列中的下一个消息并加锁执行被标注方法,即只有获取到一个消息并将被标注方法执行完,当前方法才能继续获取下一个消息

    • @RabbitListener除了标注在方法上还可以被标注在类上,但是@RabbitHandler只能标注在方法上

      • 实际开发中一般@RabbitListener@RabbitHandler一起使用,将@RabbitListener标注在类上依赖指明要监听的所有队列,将@RabbitHandler标注在方法上用来指明消息将要执行的方法,通过参数列表的参数封装类型和消息头中的全限定类名来匹配为同一个队列的不同封装类型的消息执行不同的指定业务方法,也可以实现不同的队列去执行各自消息封装类型作为参数的方法[实际我感觉这么用是多此一举,我完全可以在多个方法上都标注@RabbitListener注解嘛,这里在开发中再细细体会],不过@RabbitHandler标注在自定义重载方法上只是区分不同的消息封装类型处理方法倒也有点意思

        • 注意这里有个坑!必须给消息对象的封装类型提供一个无参构造器!否则会报错!

     

     

可靠性投递


  1. 可靠性投递的消息投递流程

    • 生产者发送消息给RabbitMQ服务器,RabbitMQ服务器收到消息后将消息交给交换器,交换器根据投递策略将消息传递给各个队列,这就是整个发送消息过程,在发送消息过程我们有两个发送者的确认回调[在消息投递的不同时机触发的回调函数]来保证消息的可靠发送

      • 如果生产者的消息成功到达Broker就会触发生产者的确认回调confirmCallback方法

      • 到达Broker的消息在交换器投递给队列的过程中也可能出现投递失败的情况,当消息被交换器没有成功投递到队列中时会触发第二个生产者的确认回调returnCallback方法,如果成功投递给队列就不会触发该回调方法

    • 被消费者监听的队列,只要队列收到消息后就会向消费者发送消息,从队列发出消息到消费者成功获取到消息这个过程就是消息接收过程,在消息接收过程我们有一个ack机制[acknowledge,就是消息确认应答机制]来保证接收消息的可靠抵达

      • ack机制能保证RabbitMQ服务器知道哪些消息都被消费者正确地接收到,如果消费者正确接收到消息,队列就会将对应的消息从队列中删除,如果消费者没有正确接收到消息,队列可能会采用将消息重新投递等兜底措施

  2. confirmCallback

    • 开启生产者确认回调:通过创建connectionFactory时设置PublisherConfirms(true)来设置开启confirmCallback回调,我们可以通过在配置文件中配置spring.rabbitmq.publisher-confirms=true来实现该功能,该配置项默认是false

    • 消息只要被Broker接收到就会执行confirmCallback方法,如果是cluster即RabbitMQ集群模式,需要所有的Broker都接收到才会调用生产者的confirmCallback方法,这个回调类似于Ajax的回调是成功后自动回调回来的,即使当前系统没有任何消费者监听任何队列只要消息发出被RabbitMQ服务器成功接收就会触发该回调并执行回调对象confirmCallbackconfirm方法,但是这同时意味着所有消息的发送确认回调执行方法都是一样的

    • 该回调只是保证消息成功到达RabbitMQ服务器,并不能保证消息一定会被成功投递到目标队列也不能保证消息能被成功投递到消费者

    • 注意ConfirmCallback实际上是RabbitTemplate中的一个接口,该接口中有一个confirm方法,当消息被RabbitMQ服务器成功接收就会执行用户自定义的回调方法confirm,该方法的参数列表中的correlationData是每个消息的唯一标识,ack表示消息是否被RabbitMQ服务器正确收到[true表示收到,false表示未被收到]cause表示消息没有被正常收到RabbitMQ服务器返回的原因;

    • rabbitTemplate中有一个非空私有属性confirmCallback就是该接口的实例化对象,我们只要使用注解@PostConstructIoC容器初始化时将自定义的ConfirmCallback匿名实现实例化对象并调用rabbitTemplatesetConfirmCallback(ConfirmCallback confirmCallback)方法来将自定义的消息可靠发送确认回调设置到rabbitTemplate的属性confirmCallback

      • @PostConstruct注解标注的方法在该注解所在类对应的组件对象被实例化以后立即执行被该注解标注的方法

      • correlationData:用来表示当前消息的唯一性,CorrelationData是一个类,里面用来标识唯一性的主要就是其中的id属性,我们发送消息时可以指定消息的唯一id,一般都是使用UUID,发送消息时我们可以调用rabbitTemplate.convertAndSend("mall-direct-exchange","hello.rabbitmq",returnReason);的重载方法void ---> rabbitTemplate.convertAndSend(String exchange,String rountingKey,Object message,CorrelationData correlationData)发送消息通过第四个参数指定消息的唯一标识,该标识将会被封装到发送端消息抵达确认的回调中作为消息的唯一标识,CorrelationData的单参构造就是封装其中的String类型的id属性,我们一般直接传参UUID,如果发送消息时没有指定CorrelationData回调时的参数CorrelationData就是null,实际开发中的用法一般是以上一步处理保存的数据与消息的关联关系的唯一标识作为消息的唯一标识,一旦发送消息过程消息丢失可以根据该标识再次组织消息重新发送或者定时扫描数据库哪些消息没有成功到达消息队列再重新发送

    • 保证消息可靠性的方式之一,数据库日志记录,通过状态判断,投递失败的消息通过定时任务重新发布

    • 开启步骤

      • 1️⃣:配置配置项spring.rabbitmq.publisher-confirms=true开启发送端消息抵达RabbitMQ服务器确认

      • 2️⃣:为rabbitTemplate在容器初始化时配置我们自定义的ConfirmCallback实例化对象

  3. returnCallback

    • 消息正确抵达RabbitMQ中的队列就会触发该回调,如果消息的路由键写错了无法匹配到队列、队列被删了、RabbitMQ集群是镜像集群,每个从节点的数据都是从主节点复制同步过来的,消息正常投递要求集群中的每个节点都得投递成功才行,只要有一个节点投递不成功投递也是失败的

    • 开启步骤

      • 1️⃣:配置配置项spring.rabbitmq.publisher-returns=true开启发送端消息抵达队列确认,注意是没有成功抵达队列才会触发该回调

      • 2️⃣:配置配置项spring.rabbitmq.template.mandatory=true,该配置项的意思是只要消息没有成功抵达队列,以异步发送的方式优先回调returnConfirm,突出一个异步

      • 3️⃣:为rabbitTemplate在容器初始化时配置我们自定义的ReturnConfirm实例化对象

  4. returnCallback

    • 消费者消息可靠抵达的Ack消息确认机制,该机制的原理是一旦消费者从队列中获取到消息就会自动给RabbitMQ回复确认收到消息,该机制是AMQP协议中的机制,默认该机制就是开启的,一旦消费者收到消息就会自动给RabbitMQ服务器回复确认,RabbitMQ服务器收到确认队列中的消息就会被移除

    • 注意这种自动应答Ack的消息确认机制存在很严重的问题,假如队列中有5条消息,我们通过打断点的方式只处理一条消息,注意这里老师的意思是消息接收和处理是独立过程,消息是一次性被消费者接收并且只要接收到就自动回复了[这里我不太理解,之前讲消息监听和接收时讲过一个服务实例只有在完整执行完标注了注解@RabbitListener或者@RabbitHandler的方法以后才会接收处理下一个消息,试验也证明当前消息处理期间下一个消息不会被处理,多个服务实例消息会自动被负载均衡到其他服务实例,就只能解释为只要有多个服务实例,RabbitMQ会自动决定哪些消息去往那些消费者,不会根据消息者的实际处理情况来判断下一条消息去往那个消费者。除非消息没有成功到达消费者才会重新把消息入队列再次将消息发送给其中一个消费者,感觉这也很合理,比如限制接口的QPS,频率太高的请求会自动被拒绝处理,根本到不了接收消息的那一步,也就不会有Ack应答。而对消息的处理的串行化实际上是服务内部对处理消息的方法单独上锁,这个锁并不影响消息的接收],这里把消息的负载均衡看做是RabbitMQ服务器的内部决策,消息是只要服务器有接收能力就直接发送给服务器,不会等到上一条消息被处理完再发送下一条消息,消息处理的串行化是消费者运行实例内部的处理,只要消息一到达消费者,不管消息是否被处理,都会立即Ack应答RabbitMQ服务器,RabbitMQ会直接将队列中的消息直接删除,但是消费者对同一条队列的消息是串行化处理的,一旦消费者出现问题比如宕机、该服务实例无法处理该消息等原因,还没有被处理的消息就会因为没来得及处理直接丢失,此时消息队列无法收到消息还没有被处理的通知,队列中的消息也已经删了,就算收到了通知也无济于事

      • :弹幕指出打断点然后停止服务,断点后的代码还是会执行,需要使用taskkill指令杀死进程,同时指出使用taskkill指令消息不会被自动确认,仍然留在消息队列中[这个确认时机还需要进一步明确]

      • 🔑:这里老师后面也发生了,确实手动停止服务,断点后面的代码还是会执行完,IDEA会把进程做完才关掉进程,因此这里服务器宕机会不会导致消息丢失还需要等明确自动Ack的应答时机才能确认

    • 🔑:因为自动确认只要消息接收到了就会自动Ack应答即消息还没处理就应答,一旦消息处理过程中出现了服务实例自身没法解决的问题消息就会丢失[比如服务器宕机,当前服务由于外部问题无法根本无法处理某个消息],我们的解决办法是关闭Ack自动应答,采用Ack手动应答的方式在当前服务实例成功处理了某个消息在发起Ack成功应答删除队列中的消息;如果处理失败我们就Ack应答失败让队列重新投递消息或者直接丢弃消息

      • 配置配置项关闭Ack应答的自动确认,开启Ack应答的手动确认

        • 注意只是配置该配置项没有设置确认方法一条消息都不会确认,如果服务器此时宕机连接断开,这些处于Unacked状态的消息会重新进入Ready状态,即手动确认是只要我们没有明确告诉RabbitMQ消息已经被签收,这些消息会一直处于Unacked状态,只要消费者和队列的连接断开,这些消息就会重新入队列,重新变成Ready状态

      • 手动签收消息channel.basicAck(long deliveryTag,boolean multiple)方法

        • deliveryTag是当前消息的派发标签,是一个long类型的数字,这个数字从消息头MessagePropertiesdeliveryTag属性中,即message.getMessageProperties().getDeliveryTag()获取的,这个属性值最大的特点是在当前信道内是自增的,用来标识一条信道内传输的的消息

        • multiple表示设置当前应答是批量应答还是只应答当前消息,为true表示批量应答[会一次性应答deliveryTag小于等于当前消息的所有消息],为false表示只应答当前消息

        • 该方法可能抛出异常,发生异常的原因是网络连接中断了

        • 一般这个方法都在对消息的监听处理方法中通过参数列表获取channel通过channel进行调用

      • 手动拒绝签收消息channel.basicNack(long deliveryTag,boolean multiple,boolean requeue)

        • deliveryTag当前消息的派发标签,是Channel中消息的唯一凭证

        • multiple是否批量应答,true会应答当前消息以前的所有消息

        • requeue参数的意思是当前消息是否重新入队,即消息被手动拒收以后消息是否重新发回RabbitMQ,让RabbitMQ重新放到队列中,如果该参数设置为true即消息重新入队列,如果该参数设置为false消息会被直接丢弃,直接丢弃相当于Ack拒收队列中的消息也会直接删除

        • 没有调用签收或者拒绝签收方法的消息会一直处于UnAcked状态,如果此时感知到与消费者连接中断,不管消费者将要对消息采取的拒绝策略是直接丢弃还是重新入队列,都是直接重新入队列

      • 手动拒绝签收消息channel.basicReject(long deliveryTag,boolean requeue)

        • 这个方法和channel.basicNack(long deliveryTag,boolean multiple,boolean requeue)的效果是一样的,只是上面的方法可以选择是否批量拒绝,这个不能选择

 

附录

  1. QPS

    Queries Per Second 是每秒查询率 ,是一台服务器每秒能够相应的查询次数,是一台特定的查询服务器每秒能够相应的查询次数,即每秒的响应请求数,也即是最大吞吐能力。

  2. TPS

    Transactions Per Second 也就是事务数/秒。一个事务是指一个客户机向服务器发送请求然后服务器做出反应的过程。客户机在发送请求时开始计时,收到服务器响应后结束计时,以此来计算使用的时间和完成的事务个数;这不就是每秒响应的请求数吗【一个页面可能有多个请求,以响应为主,10个请求,一个响应;收到一个响应算一个TPS【理解成一个客户机同一时间发出请求并接受到响应的过程算一个事务,期间可能涉及多个请求】,发送诗词请求是10RPS,如果请求都是查询请求,就是10QPS】

  3. 并发数【并发度】

    指系统同时能处理的请求数量,同样反应了系统的负载能力。这个数值可以分析机器1s内的访问日志数量来得到

    QPS(TPS)=并发数/平均响应时间【QPS(TPS)=并发数/平均响应时间;并发数:系统同时处理的request/事务数;响应时间:一般取平均响应时间】

  4. 吞吐量

    指系统在单位时间内处理请求的数量,一个系统的吞吐量(承压能力)与request(请求)对cpu的消耗,外部接口,IO等等紧密关联。

    一个系统吞吐量通常有QPS(TPS),并发数两个因素决定,每套系统这个两个值都有一个相对极限值,在应用场景访问压力下,只要某一项达到系统最高值,系统吞吐量就上不去了,如果压力继续增大,系统的吞吐量反而会下降,原因是系统超负荷工作,上下文切换,内存等等其他消耗导致系统性能下降。

  5. PV【页面访问量】

    【Page View】,即页面浏览量或点击量,用户每次刷新即被计算一次。可以统计服务一天的访问日志得到

  6. UV【独立访客】

    【Unique Visitor】统计1天内访问某站点的用户数。可以统计服务一天的访问日志并根据用户的唯一标识去重得到。

  7. RT【响应时间】

    响应时间是指系统对请求作出响应的时间,一般取平均响应时间。可以通过Nginx、Apache之类的Web Server得到。

  8. DAU【日活跃用户数量】

    【Daily Active User】常用于反映网站、互联网应用或网络游戏的运营情况。DAU通常统计一日(统计日)之内,登录或使用了某个产品的用户数(去除重复登录的用户),与UV概念相似

  9. MAU【月活跃用户数量】

    【Month Active User】指网站、app等去重后的月活跃用户数量

  10. typora快捷键

  1. 查询一下消息大小限制和队列长度限制方面相关的博文

  2. linux命令:ip addr效果和ipconfig类似,都显示ip地址